This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bc771b1fc1 [SPARK-41707][CONNECT] Implement Catalog API in Spark 
Connect
5bc771b1fc1 is described below

commit 5bc771b1fc1c2d9c198a49c5a882090634a774d0
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Dec 27 07:59:26 2022 +0900

    [SPARK-41707][CONNECT] Implement Catalog API in Spark Connect
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to implement all catalog API in Spark Connect, except 
`cache`, `uncache`, and `isCached` because of potential design issues (e.g., 
eager vs lazy). Namely it proposes the Catalog API below:
    
    - `spark.catalog.currentDatabase`
    - `spark.catalog.setCurrentDatabase`
    - `spark.catalog.listDatabases`
    - `spark.catalog.listTables`
    - `spark.catalog.listFunctions`
    - `spark.catalog.listColumns`
    - `spark.catalog.getDatabase`
    - `spark.catalog.getTable`
    - `spark.catalog.getFunction`
    - `spark.catalog.databaseExists`
    - `spark.catalog.tableExists`
    - `spark.catalog.functionExists`
    - `spark.catalog.createExternalTable`
    - `spark.catalog.createTable`
    - `spark.catalog.dropTempView`
    - `spark.catalog.dropGlobalTempView`
    - `spark.catalog.recoverPartitions`
    - `spark.catalog.clearCache`
    - `spark.catalog.refreshTable`
    - `spark.catalog.refreshByPath`
    - `spark.catalog.currentCatalog`
    - `spark.catalog.setCurrentCatalog`
    - `spark.catalog.listCatalogs`
    
    The main approach is to stick to what we have in DataFrame API. It follows 
the same way as what we're doing in DataFrame API (see `DataFrame.show()` as an 
example).
    
    It puts all into a DataFrame (e.g., a bool for `tableExists`, a string for 
`currentDatabase`, a list of tables, etc), and send them to the client side 
through Arrow batches, which makes us to avoid having another set of Protobuf 
messages for its responses.
    
    I explicitly marked them as internal-only for now because we would likely 
need more design here, for example, considering HMS thrift definitions.
    
    In addition, this PR also adds `spark.table` (alias of `spark.read.table`) 
API in order to reuse PySpark tests.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it adds the support of Spark Connect in `spark.catalog`.
    
    ### How was this patch tested?
    
    Reused the unitests of the existing PySpark Catalog tests.
    
    Closes #39214 from HyukjinKwon/SPARK-41270.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../src/main/protobuf/spark/connect/catalog.proto  | 199 ++++++
 .../main/protobuf/spark/connect/relations.proto    |  32 +
 .../sql/connect/planner/SparkConnectPlanner.scala  | 311 +++++++-
 dev/sparktestsupport/modules.py                    |   1 +
 python/pyspark/sql/catalog.py                      |  69 ++
 python/pyspark/sql/connect/catalog.py              | 307 ++++++++
 python/pyspark/sql/connect/plan.py                 | 783 +++++++++++++++++++++
 python/pyspark/sql/connect/proto/__init__.py       |   1 +
 python/pyspark/sql/connect/proto/catalog_pb2.py    | 396 +++++++++++
 python/pyspark/sql/connect/proto/catalog_pb2.pyi   | 722 +++++++++++++++++++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 175 ++---
 python/pyspark/sql/connect/proto/relations_pb2.pyi | 220 ++++++
 python/pyspark/sql/connect/session.py              |  16 +
 python/pyspark/sql/session.py                      |   6 +
 .../sql/tests/connect/test_parity_catalog.py       |  69 ++
 python/pyspark/sql/tests/test_catalog.py           |  86 ++-
 python/pyspark/testing/sqlutils.py                 |   6 +-
 17 files changed, 3270 insertions(+), 129 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
new file mode 100644
index 00000000000..cf5eb403754
--- /dev/null
+++ b/connector/connect/common/src/main/protobuf/spark/connect/catalog.proto
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+package spark.connect;
+
+import "spark/connect/types.proto";
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+
+// See `spark.catalog.currentDatabase`
+message CurrentDatabase { }
+
+// See `spark.catalog.setCurrentDatabase`
+message SetCurrentDatabase {
+  // (Required)
+  string db_name = 1;
+}
+
+// See `spark.catalog.listDatabases`
+message ListDatabases { }
+
+// See `spark.catalog.listTables`
+message ListTables {
+  // (Optional)
+  optional string db_name = 1;
+}
+
+// See `spark.catalog.listFunctions`
+message ListFunctions {
+  // (Optional)
+  optional string db_name = 1;
+}
+
+// See `spark.catalog.listColumns`
+message ListColumns {
+  // (Required)
+  string table_name = 1;
+  // (Optional)
+  optional string db_name = 2;
+}
+
+// See `spark.catalog.getDatabase`
+message GetDatabase {
+  // (Required)
+  string db_name = 1;
+}
+
+// See `spark.catalog.getTable`
+message GetTable {
+  // (Required)
+  string table_name = 1;
+  // (Optional)
+  optional string db_name = 2;
+}
+
+// See `spark.catalog.getFunction`
+message GetFunction {
+  // (Required)
+  string function_name = 1;
+  // (Optional)
+  optional string db_name = 2;
+}
+
+// See `spark.catalog.databaseExists`
+message DatabaseExists {
+  // (Required)
+  string db_name = 1;
+}
+
+// See `spark.catalog.tableExists`
+message TableExists {
+  // (Required)
+  string table_name = 1;
+  // (Optional)
+  optional string db_name = 2;
+}
+
+// See `spark.catalog.functionExists`
+message FunctionExists {
+  // (Required)
+  string function_name = 1;
+  // (Optional)
+  optional string db_name = 2;
+}
+
+// See `spark.catalog.createExternalTable`
+message CreateExternalTable {
+  // (Required)
+  string table_name = 1;
+  // (Optional)
+  optional string path = 2;
+  // (Optional)
+  optional string source = 3;
+  // (Optional)
+  optional DataType schema = 4;
+  // Options could be empty for valid data source format.
+  // The map key is case insensitive.
+  map<string, string> options = 5;
+}
+
+// See `spark.catalog.createTable`
+message CreateTable {
+  // (Required)
+  string table_name = 1;
+  // (Optional)
+  optional string path = 2;
+  // (Optional)
+  optional string source = 3;
+  // (Optional)
+  optional string description = 4;
+  // (Optional)
+  optional DataType schema = 5;
+  // Options could be empty for valid data source format.
+  // The map key is case insensitive.
+  map<string, string> options = 6;
+}
+
+// See `spark.catalog.dropTempView`
+message DropTempView {
+  // (Required)
+  string view_name = 1;
+}
+
+// See `spark.catalog.dropGlobalTempView`
+message DropGlobalTempView {
+  // (Required)
+  string view_name = 1;
+}
+
+// See `spark.catalog.recoverPartitions`
+message RecoverPartitions {
+  // (Required)
+  string table_name = 1;
+}
+
+// TODO(SPARK-41612): Support Catalog.isCached
+//// See `spark.catalog.isCached`
+//message IsCached {
+//  // (Required)
+//  string table_name = 1;
+//}
+//
+// TODO(SPARK-41600): Support Catalog.cacheTable
+//// See `spark.catalog.cacheTable`
+//message CacheTable {
+//  // (Required)
+//  string table_name = 1;
+//}
+//
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+//// See `spark.catalog.uncacheTable`
+//message UncacheTable {
+//  // (Required)
+//  string table_name = 1;
+//}
+
+// See `spark.catalog.clearCache`
+message ClearCache { }
+
+// See `spark.catalog.refreshTable`
+message RefreshTable {
+  // (Required)
+  string table_name = 1;
+}
+
+// See `spark.catalog.refreshByPath`
+message RefreshByPath {
+  // (Required)
+  string path = 1;
+}
+
+// See `spark.catalog.currentCatalog`
+message CurrentCatalog { }
+
+// See `spark.catalog.setCurrentCatalog`
+message SetCurrentCatalog {
+  // (Required)
+  string catalog_name = 1;
+}
+
+// See `spark.catalog.listCatalogs`
+message ListCatalogs { }
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index b40a865cb29..ca17b220841 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -21,6 +21,7 @@ package spark.connect;
 
 import "spark/connect/expressions.proto";
 import "spark/connect/types.proto";
+import "spark/connect/catalog.proto";
 
 option java_multiple_files = true;
 option java_package = "org.apache.spark.connect.proto";
@@ -68,6 +69,37 @@ message Relation {
     StatCrosstab crosstab = 101;
     StatDescribe describe = 102;
 
+    // Catalog API (internal-only)
+    CurrentDatabase current_database = 201;
+    SetCurrentDatabase set_current_database = 202;
+    ListDatabases list_databases = 203;
+    ListTables list_tables = 204;
+    ListFunctions list_functions = 205;
+    ListColumns list_columns = 206;
+    GetDatabase get_database = 207;
+    GetTable get_table = 208;
+    GetFunction get_function = 209;
+    DatabaseExists database_exists = 210;
+    TableExists table_exists = 211;
+    FunctionExists function_exists = 212;
+    CreateExternalTable create_external_table = 213;
+    CreateTable create_table = 214;
+    DropTempView drop_temp_view = 215;
+    DropGlobalTempView drop_global_temp_view = 216;
+    RecoverPartitions recover_partitions = 217;
+// TODO(SPARK-41612): Support Catalog.isCached
+//    IsCached is_cached = 218;
+// TODO(SPARK-41600): Support Catalog.cacheTable
+//    CacheTable cache_table = 219;
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+//    UncacheTable uncache_table = 220;
+    ClearCache clear_cache = 221;
+    RefreshTable refresh_table = 222;
+    RefreshByPath refresh_by_path = 223;
+    CurrentCatalog current_catalog = 224;
+    SetCurrentCatalog set_current_catalog = 225;
+    ListCatalogs list_catalogs = 226;
+
     Unknown unknown = 999;
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index cb8d30b180c..21ff96f158e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -25,8 +25,7 @@ import com.google.common.collect.{Lists, Maps}
 import org.apache.spark.TaskContext
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
 import org.apache.spark.connect.proto
-import org.apache.spark.connect.proto.WriteOperation
-import org.apache.spark.sql.{Column, Dataset, SparkSession}
+import org.apache.spark.sql.{Column, Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.{expressions, AliasIdentifier, 
FunctionIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, 
MultiAlias, UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, 
UnresolvedRegex, UnresolvedRelation, UnresolvedStar}
 import org.apache.spark.sql.catalyst.expressions._
@@ -41,6 +40,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.execution.command.CreateViewCommand
 import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
+import org.apache.spark.sql.internal.CatalogImpl
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -61,6 +61,7 @@ class SparkConnectPlanner(session: SparkSession) {
   // The root of the query plan is a relation and we apply the transformations 
to it.
   def transformRelation(rel: proto.Relation): LogicalPlan = {
     rel.getRelTypeCase match {
+      // DataFrame API
       case proto.Relation.RelTypeCase.SHOW_STRING => 
transformShowString(rel.getShowString)
       case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead)
       case proto.Relation.RelTypeCase.PROJECT => 
transformProject(rel.getProject)
@@ -99,6 +100,50 @@ class SparkConnectPlanner(session: SparkSession) {
       case proto.Relation.RelTypeCase.UNPIVOT => 
transformUnpivot(rel.getUnpivot)
       case proto.Relation.RelTypeCase.RELTYPE_NOT_SET =>
         throw new IndexOutOfBoundsException("Expected Relation to be set, but 
is empty.")
+
+      // Catalog API (internal-only)
+      case proto.Relation.RelTypeCase.CURRENT_DATABASE =>
+        transformCurrentDatabase(rel.getCurrentDatabase)
+      case proto.Relation.RelTypeCase.SET_CURRENT_DATABASE =>
+        transformSetCurrentDatabase(rel.getSetCurrentDatabase)
+      case proto.Relation.RelTypeCase.LIST_DATABASES =>
+        transformListDatabases(rel.getListDatabases)
+      case proto.Relation.RelTypeCase.LIST_TABLES => 
transformListTables(rel.getListTables)
+      case proto.Relation.RelTypeCase.LIST_FUNCTIONS =>
+        transformListFunctions(rel.getListFunctions)
+      case proto.Relation.RelTypeCase.LIST_COLUMNS => 
transformListColumns(rel.getListColumns)
+      case proto.Relation.RelTypeCase.GET_DATABASE => 
transformGetDatabase(rel.getGetDatabase)
+      case proto.Relation.RelTypeCase.GET_TABLE => 
transformGetTable(rel.getGetTable)
+      case proto.Relation.RelTypeCase.GET_FUNCTION => 
transformGetFunction(rel.getGetFunction)
+      case proto.Relation.RelTypeCase.DATABASE_EXISTS =>
+        transformDatabaseExists(rel.getDatabaseExists)
+      case proto.Relation.RelTypeCase.TABLE_EXISTS => 
transformTableExists(rel.getTableExists)
+      case proto.Relation.RelTypeCase.FUNCTION_EXISTS =>
+        transformFunctionExists(rel.getFunctionExists)
+      case proto.Relation.RelTypeCase.CREATE_EXTERNAL_TABLE =>
+        transformCreateExternalTable(rel.getCreateExternalTable)
+      case proto.Relation.RelTypeCase.CREATE_TABLE => 
transformCreateTable(rel.getCreateTable)
+      case proto.Relation.RelTypeCase.DROP_TEMP_VIEW => 
transformDropTempView(rel.getDropTempView)
+      case proto.Relation.RelTypeCase.DROP_GLOBAL_TEMP_VIEW =>
+        transformDropGlobalTempView(rel.getDropGlobalTempView)
+      case proto.Relation.RelTypeCase.RECOVER_PARTITIONS =>
+        transformRecoverPartitions(rel.getRecoverPartitions)
+// TODO(SPARK-41612): Support Catalog.isCached
+//      case proto.Relation.RelTypeCase.IS_CACHED => 
transformIsCached(rel.getIsCached)
+// TODO(SPARK-41600): Support Catalog.cacheTable
+//      case proto.Relation.RelTypeCase.CACHE_TABLE => 
transformCacheTable(rel.getCacheTable)
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+//      case proto.Relation.RelTypeCase.UNCACHE_TABLE => 
transformUncacheTable(rel.getUncacheTable)
+      case proto.Relation.RelTypeCase.CLEAR_CACHE => 
transformClearCache(rel.getClearCache)
+      case proto.Relation.RelTypeCase.REFRESH_TABLE => 
transformRefreshTable(rel.getRefreshTable)
+      case proto.Relation.RelTypeCase.REFRESH_BY_PATH =>
+        transformRefreshByPath(rel.getRefreshByPath)
+      case proto.Relation.RelTypeCase.CURRENT_CATALOG =>
+        transformCurrentCatalog(rel.getCurrentCatalog)
+      case proto.Relation.RelTypeCase.SET_CURRENT_CATALOG =>
+        transformSetCurrentCatalog(rel.getSetCurrentCatalog)
+      case proto.Relation.RelTypeCase.LIST_CATALOGS => 
transformListCatalogs(rel.getListCatalogs)
+
       case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
     }
   }
@@ -1041,7 +1086,7 @@ class SparkConnectPlanner(session: SparkSession) {
    *
    * @param writeOperation
    */
-  def handleWriteOperation(writeOperation: WriteOperation): Unit = {
+  def handleWriteOperation(writeOperation: proto.WriteOperation): Unit = {
     // Transform the input plan into the logical plan.
     val planner = new SparkConnectPlanner(session)
     val plan = planner.transformRelation(writeOperation.getInput)
@@ -1092,4 +1137,264 @@ class SparkConnectPlanner(session: SparkSession) {
     }
   }
 
+  private val emptyLocalRelation = LocalRelation(
+    output = AttributeReference("value", StringType, false)() :: Nil,
+    data = Seq.empty)
+
+  private def transformCurrentDatabase(getCurrentDatabase: 
proto.CurrentDatabase): LogicalPlan = {
+    session.createDataset(session.catalog.currentDatabase :: 
Nil)(Encoders.STRING).logicalPlan
+  }
+
+  private def transformSetCurrentDatabase(
+      getSetCurrentDatabase: proto.SetCurrentDatabase): LogicalPlan = {
+    session.catalog.setCurrentDatabase(getSetCurrentDatabase.getDbName)
+    emptyLocalRelation
+  }
+
+  private def transformListDatabases(getListDatabases: proto.ListDatabases): 
LogicalPlan = {
+    session.catalog.listDatabases().logicalPlan
+  }
+
+  private def transformListTables(getListTables: proto.ListTables): 
LogicalPlan = {
+    if (getListTables.hasDbName) {
+      session.catalog.listTables(getListTables.getDbName).logicalPlan
+    } else {
+      session.catalog.listTables().logicalPlan
+    }
+  }
+
+  private def transformListFunctions(getListFunctions: proto.ListFunctions): 
LogicalPlan = {
+    if (getListFunctions.hasDbName) {
+      session.catalog.listFunctions(getListFunctions.getDbName).logicalPlan
+    } else {
+      session.catalog.listFunctions().logicalPlan
+    }
+  }
+
+  private def transformListColumns(getListColumns: proto.ListColumns): 
LogicalPlan = {
+    if (getListColumns.hasDbName) {
+      session.catalog
+        .listColumns(dbName = getListColumns.getDbName, tableName = 
getListColumns.getTableName)
+        .logicalPlan
+    } else {
+      session.catalog.listColumns(getListColumns.getTableName).logicalPlan
+    }
+  }
+
+  private def transformGetDatabase(getGetDatabase: proto.GetDatabase): 
LogicalPlan = {
+    CatalogImpl
+      .makeDataset(session.catalog.getDatabase(getGetDatabase.getDbName) :: 
Nil, session)
+      .logicalPlan
+  }
+
+  private def transformGetTable(getGetTable: proto.GetTable): LogicalPlan = {
+    if (getGetTable.hasDbName) {
+      CatalogImpl
+        .makeDataset(
+          session.catalog.getTable(
+            dbName = getGetTable.getDbName,
+            tableName = getGetTable.getTableName) :: Nil,
+          session)
+        .logicalPlan
+    } else {
+      CatalogImpl
+        .makeDataset(session.catalog.getTable(getGetTable.getTableName) :: 
Nil, session)
+        .logicalPlan
+    }
+  }
+
+  private def transformGetFunction(getGetFunction: proto.GetFunction): 
LogicalPlan = {
+    if (getGetFunction.hasDbName) {
+      CatalogImpl
+        .makeDataset(
+          session.catalog.getFunction(
+            dbName = getGetFunction.getDbName,
+            functionName = getGetFunction.getFunctionName) :: Nil,
+          session)
+        .logicalPlan
+    } else {
+      CatalogImpl
+        
.makeDataset(session.catalog.getFunction(getGetFunction.getFunctionName) :: 
Nil, session)
+        .logicalPlan
+    }
+  }
+
+  private def transformDatabaseExists(getDatabaseExists: 
proto.DatabaseExists): LogicalPlan = {
+    session
+      
.createDataset(session.catalog.databaseExists(getDatabaseExists.getDbName) :: 
Nil)(
+        Encoders.scalaBoolean)
+      .logicalPlan
+  }
+
+  private def transformTableExists(getTableExists: proto.TableExists): 
LogicalPlan = {
+    if (getTableExists.hasDbName) {
+      session
+        .createDataset(
+          session.catalog.tableExists(
+            dbName = getTableExists.getDbName,
+            tableName = getTableExists.getTableName) :: 
Nil)(Encoders.scalaBoolean)
+        .logicalPlan
+    } else {
+      session
+        
.createDataset(session.catalog.tableExists(getTableExists.getTableName) :: Nil)(
+          Encoders.scalaBoolean)
+        .logicalPlan
+    }
+  }
+
+  private def transformFunctionExists(getFunctionExists: 
proto.FunctionExists): LogicalPlan = {
+    if (getFunctionExists.hasDbName) {
+      session
+        .createDataset(
+          session.catalog.functionExists(
+            dbName = getFunctionExists.getDbName,
+            functionName = getFunctionExists.getFunctionName) :: 
Nil)(Encoders.scalaBoolean)
+        .logicalPlan
+    } else {
+      session
+        
.createDataset(session.catalog.functionExists(getFunctionExists.getFunctionName)
 :: Nil)(
+          Encoders.scalaBoolean)
+        .logicalPlan
+    }
+  }
+
+  private def transformCreateExternalTable(
+      getCreateExternalTable: proto.CreateExternalTable): LogicalPlan = {
+    val schema = if (getCreateExternalTable.hasSchema) {
+      val struct = 
DataTypeProtoConverter.toCatalystType(getCreateExternalTable.getSchema)
+      assert(struct.isInstanceOf[StructType])
+      struct.asInstanceOf[StructType]
+    } else {
+      new StructType
+    }
+
+    val source = if (getCreateExternalTable.hasSource) {
+      getCreateExternalTable.getSource
+    } else {
+      session.sessionState.conf.defaultDataSourceName
+    }
+
+    val options = if (getCreateExternalTable.hasPath) {
+      (getCreateExternalTable.getOptionsMap.asScala ++
+        Map("path" -> getCreateExternalTable.getPath)).asJava
+    } else {
+      getCreateExternalTable.getOptionsMap
+    }
+    session.catalog
+      .createTable(
+        tableName = getCreateExternalTable.getTableName,
+        source = source,
+        schema = schema,
+        options = options)
+      .logicalPlan
+  }
+
+  private def transformCreateTable(getCreateTable: proto.CreateTable): 
LogicalPlan = {
+    val schema = if (getCreateTable.hasSchema) {
+      val struct = 
DataTypeProtoConverter.toCatalystType(getCreateTable.getSchema)
+      assert(struct.isInstanceOf[StructType])
+      struct.asInstanceOf[StructType]
+    } else {
+      new StructType
+    }
+
+    val source = if (getCreateTable.hasSource) {
+      getCreateTable.getSource
+    } else {
+      session.sessionState.conf.defaultDataSourceName
+    }
+
+    val description = if (getCreateTable.hasDescription) {
+      getCreateTable.getDescription
+    } else {
+      ""
+    }
+
+    val options = if (getCreateTable.hasPath) {
+      (getCreateTable.getOptionsMap.asScala ++
+        Map("path" -> getCreateTable.getPath)).asJava
+    } else {
+      getCreateTable.getOptionsMap
+    }
+
+    session.catalog
+      .createTable(
+        tableName = getCreateTable.getTableName,
+        source = source,
+        schema = schema,
+        description = description,
+        options = options)
+      .logicalPlan
+  }
+
+  private def transformDropTempView(getDropTempView: proto.DropTempView): 
LogicalPlan = {
+    session
+      .createDataset(session.catalog.dropTempView(getDropTempView.getViewName) 
:: Nil)(
+        Encoders.scalaBoolean)
+      .logicalPlan
+  }
+
+  private def transformDropGlobalTempView(
+      getDropGlobalTempView: proto.DropGlobalTempView): LogicalPlan = {
+    session
+      .createDataset(
+        session.catalog.dropGlobalTempView(getDropGlobalTempView.getViewName) 
:: Nil)(
+        Encoders.scalaBoolean)
+      .logicalPlan
+  }
+
+  private def transformRecoverPartitions(
+      getRecoverPartitions: proto.RecoverPartitions): LogicalPlan = {
+    session.catalog.recoverPartitions(getRecoverPartitions.getTableName)
+    emptyLocalRelation
+  }
+
+// TODO(SPARK-41612): Support Catalog.isCached
+//  private def transformIsCached(getIsCached: proto.IsCached): LogicalPlan = {
+//    session
+//      .createDataset(session.catalog.isCached(getIsCached.getTableName) :: 
Nil)(
+//        Encoders.scalaBoolean)
+//      .logicalPlan
+//  }
+//
+// TODO(SPARK-41600): Support Catalog.cacheTable
+//  private def transformCacheTable(getCacheTable: proto.CacheTable): 
LogicalPlan = {
+//    session.catalog.cacheTable(getCacheTable.getTableName)
+//    emptyLocalRelation
+//  }
+//
+// TODO(SPARK-41623): Support Catalog.uncacheTable
+//  private def transformUncacheTable(getUncacheTable: proto.UncacheTable): 
LogicalPlan = {
+//    session.catalog.uncacheTable(getUncacheTable.getTableName)
+//    emptyLocalRelation
+//  }
+
+  private def transformClearCache(getClearCache: proto.ClearCache): 
LogicalPlan = {
+    session.catalog.clearCache()
+    emptyLocalRelation
+  }
+
+  private def transformRefreshTable(getRefreshTable: proto.RefreshTable): 
LogicalPlan = {
+    session.catalog.refreshTable(getRefreshTable.getTableName)
+    emptyLocalRelation
+  }
+
+  private def transformRefreshByPath(getRefreshByPath: proto.RefreshByPath): 
LogicalPlan = {
+    session.catalog.refreshByPath(getRefreshByPath.getPath)
+    emptyLocalRelation
+  }
+
+  private def transformCurrentCatalog(getCurrentCatalog: 
proto.CurrentCatalog): LogicalPlan = {
+    session.createDataset(session.catalog.currentCatalog() :: 
Nil)(Encoders.STRING).logicalPlan
+  }
+
+  private def transformSetCurrentCatalog(
+      getSetCurrentCatalog: proto.SetCurrentCatalog): LogicalPlan = {
+    session.catalog.setCurrentCatalog(getSetCurrentCatalog.getCatalogName)
+    emptyLocalRelation
+  }
+
+  private def transformListCatalogs(getListCatalogs: proto.ListCatalogs): 
LogicalPlan = {
+    session.catalog.listCatalogs().logicalPlan
+  }
 }
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 51f5246d741..84e258d20e9 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -512,6 +512,7 @@ pyspark_connect = Module(
         "pyspark.sql.tests.connect.test_connect_basic",
         "pyspark.sql.tests.connect.test_connect_function",
         "pyspark.sql.tests.connect.test_connect_column",
+        "pyspark.sql.tests.connect.test_parity_catalog",
         "pyspark.sql.tests.connect.test_parity_functions",
         "pyspark.sql.tests.connect.test_parity_dataframe",
     ],
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 4e8c5d96d01..77dbc3e024a 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -91,6 +91,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Examples
         --------
         >>> spark.catalog.currentCatalog()
@@ -103,6 +106,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         catalogName : str
@@ -119,6 +125,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Returns
         -------
         list
@@ -137,6 +146,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Returns
         -------
         str
@@ -155,6 +167,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Examples
         --------
         >>> spark.catalog.setCurrentDatabase("default")
@@ -167,6 +182,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Returns
         -------
         list
@@ -197,6 +215,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         dbName : str
@@ -230,6 +251,9 @@ class Catalog:
 
         .. versionadded:: 3.3.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         dbName : str
@@ -266,6 +290,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         dbName : str
@@ -325,6 +352,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -381,6 +411,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         dbName : str
@@ -432,6 +465,9 @@ class Catalog:
 
         .. versionadded:: 3.3.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         functionName : str
@@ -481,6 +517,9 @@ class Catalog:
 
         .. versionadded:: 3.4.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         functionName : str
@@ -531,6 +570,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -594,6 +636,9 @@ class Catalog:
 
         .. versionadded:: 3.3.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -692,6 +737,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Returns
         -------
         :class:`DataFrame`
@@ -715,6 +763,9 @@ class Catalog:
 
         .. versionadded:: 2.2.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -785,6 +836,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         viewName : str
@@ -822,6 +876,9 @@ class Catalog:
 
         .. versionadded:: 2.1.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         viewName : str
@@ -987,6 +1044,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Examples
         --------
         >>> _ = spark.sql("DROP TABLE IF EXISTS tbl1")
@@ -1003,6 +1063,9 @@ class Catalog:
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -1051,6 +1114,9 @@ class Catalog:
 
         .. versionadded:: 2.1.1
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
@@ -1095,6 +1161,9 @@ class Catalog:
 
         .. versionadded:: 2.2.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         path : str
diff --git a/python/pyspark/sql/connect/catalog.py 
b/python/pyspark/sql/connect/catalog.py
new file mode 100644
index 00000000000..ed673f1d0d3
--- /dev/null
+++ b/python/pyspark/sql/connect/catalog.py
@@ -0,0 +1,307 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import List, Optional, TYPE_CHECKING
+
+import pandas as pd
+
+from pyspark.sql.types import StructType
+from pyspark.sql.connect import DataFrame
+from pyspark.sql.catalog import (
+    Catalog as PySparkCatalog,
+    CatalogMetadata,
+    Database,
+    Table,
+    Function,
+    Column,
+)
+from pyspark.sql.connect import plan
+
+if TYPE_CHECKING:
+    from pyspark.sql.connect.session import SparkSession
+
+
+class Catalog:
+    def __init__(self, sparkSession: "SparkSession") -> None:
+        self._sparkSession = sparkSession
+
+    # TODO(SPARK-41716): Probably should factor out to 
pyspark.sql.connect.client.
+    def _catalog_to_pandas(self, catalog: plan.LogicalPlan) -> pd.DataFrame:
+        pdf = DataFrame.withPlan(catalog, 
session=self._sparkSession).toPandas()
+        assert pdf is not None
+        return pdf
+
+    def currentCatalog(self) -> str:
+        pdf = self._catalog_to_pandas(plan.CurrentCatalog())
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    currentCatalog.__doc__ = PySparkCatalog.currentCatalog.__doc__
+
+    def setCurrentCatalog(self, catalogName: str) -> None:
+        
self._catalog_to_pandas(plan.SetCurrentCatalog(catalog_name=catalogName))
+
+    setCurrentCatalog.__doc__ = PySparkCatalog.setCurrentCatalog.__doc__
+
+    def listCatalogs(self) -> List[CatalogMetadata]:
+        pdf = self._catalog_to_pandas(plan.ListCatalogs())
+        return [
+            CatalogMetadata(name=row.iloc[0], description=row.iloc[1]) for _, 
row in pdf.iterrows()
+        ]
+
+    listCatalogs.__doc__ = PySparkCatalog.listCatalogs.__doc__
+
+    def currentDatabase(self) -> str:
+        pdf = self._catalog_to_pandas(plan.CurrentDatabase())
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    currentDatabase.__doc__ = PySparkCatalog.currentDatabase.__doc__
+
+    def setCurrentDatabase(self, dbName: str) -> None:
+        self._catalog_to_pandas(plan.SetCurrentDatabase(db_name=dbName))
+
+    setCurrentDatabase.__doc__ = PySparkCatalog.setCurrentDatabase.__doc__
+
+    def listDatabases(self) -> List[Database]:
+        pdf = self._catalog_to_pandas(plan.ListDatabases())
+        return [
+            Database(
+                name=row.iloc[0],
+                catalog=row.iloc[1],
+                description=row.iloc[2],
+                locationUri=row.iloc[3],
+            )
+            for _, row in pdf.iterrows()
+        ]
+
+    listDatabases.__doc__ = PySparkCatalog.listDatabases.__doc__
+
+    def getDatabase(self, dbName: str) -> Database:
+        pdf = self._catalog_to_pandas(plan.GetDatabase(db_name=dbName))
+        assert pdf is not None
+        row = pdf.iloc[0]
+        return Database(
+            name=row[0],
+            catalog=row[1],
+            description=row[2],
+            locationUri=row[3],
+        )
+
+    getDatabase.__doc__ = PySparkCatalog.getDatabase.__doc__
+
+    def databaseExists(self, dbName: str) -> bool:
+        pdf = self._catalog_to_pandas(plan.DatabaseExists(db_name=dbName))
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    databaseExists.__doc__ = PySparkCatalog.databaseExists.__doc__
+
+    def listTables(self, dbName: Optional[str] = None) -> List[Table]:
+        pdf = self._catalog_to_pandas(plan.ListTables(db_name=dbName))
+        return [
+            Table(
+                name=row.iloc[0],
+                catalog=row.iloc[1],
+                # If empty or None, returns None.
+                namespace=None if row.iloc[2] is None else list(row.iloc[2]) 
or None,
+                description=row.iloc[3],
+                tableType=row.iloc[4],
+                isTemporary=row.iloc[5],
+            )
+            for _, row in pdf.iterrows()
+        ]
+
+    listTables.__doc__ = PySparkCatalog.listTables.__doc__
+
+    def getTable(self, tableName: str) -> Table:
+        pdf = self._catalog_to_pandas(plan.GetTable(table_name=tableName))
+        assert pdf is not None
+        row = pdf.iloc[0]
+        return Table(
+            name=row.iloc[0],
+            catalog=row.iloc[1],
+            # If empty or None, returns None.
+            namespace=None if row.iloc[2] is None else list(row.iloc[2]) or 
None,
+            description=row.iloc[3],
+            tableType=row.iloc[4],
+            isTemporary=row.iloc[5],
+        )
+
+    getTable.__doc__ = PySparkCatalog.getTable.__doc__
+
+    def listFunctions(self, dbName: Optional[str] = None) -> List[Function]:
+        pdf = self._catalog_to_pandas(plan.ListFunctions(db_name=dbName))
+        return [
+            Function(
+                name=row.iloc[0],
+                catalog=row.iloc[1],
+                # If empty or None, returns None.
+                namespace=None if row.iloc[2] is None else list(row.iloc[2]) 
or None,
+                description=row.iloc[3],
+                className=row.iloc[4],
+                isTemporary=row.iloc[5],
+            )
+            for _, row in pdf.iterrows()
+        ]
+
+    listFunctions.__doc__ = PySparkCatalog.listFunctions.__doc__
+
+    def functionExists(self, functionName: str, dbName: Optional[str] = None) 
-> bool:
+        pdf = self._catalog_to_pandas(
+            plan.FunctionExists(function_name=functionName, db_name=dbName)
+        )
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    functionExists.__doc__ = PySparkCatalog.functionExists.__doc__
+
+    def getFunction(self, functionName: str) -> Function:
+        pdf = 
self._catalog_to_pandas(plan.GetFunction(function_name=functionName))
+        assert pdf is not None
+        row = pdf.iloc[0]
+        return Function(
+            name=row.iloc[0],
+            catalog=row.iloc[1],
+            # If empty or None, returns None.
+            namespace=None if row.iloc[2] is None else list(row.iloc[2]) or 
None,
+            description=row.iloc[3],
+            className=row.iloc[4],
+            isTemporary=row.iloc[5],
+        )
+
+    getFunction.__doc__ = PySparkCatalog.getFunction.__doc__
+
+    def listColumns(self, tableName: str, dbName: Optional[str] = None) -> 
List[Column]:
+        pdf = self._catalog_to_pandas(plan.ListColumns(table_name=tableName, 
db_name=dbName))
+        return [
+            Column(
+                name=row.iloc[0],
+                description=row.iloc[1],
+                dataType=row.iloc[2],
+                nullable=row.iloc[3],
+                isPartition=row.iloc[4],
+                isBucket=row.iloc[5],
+            )
+            for _, row in pdf.iterrows()
+        ]
+
+    listColumns.__doc__ = PySparkCatalog.listColumns.__doc__
+
+    def tableExists(self, tableName: str, dbName: Optional[str] = None) -> 
bool:
+        pdf = self._catalog_to_pandas(plan.TableExists(table_name=tableName, 
db_name=dbName))
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    tableExists.__doc__ = PySparkCatalog.tableExists.__doc__
+
+    def createExternalTable(
+        self,
+        tableName: str,
+        path: Optional[str] = None,
+        source: Optional[str] = None,
+        schema: Optional[StructType] = None,
+        **options: str,
+    ) -> DataFrame:
+        catalog = plan.CreateExternalTable(
+            table_name=tableName,
+            path=path,  # type: ignore[arg-type]
+            source=source,
+            schema=schema,
+            options=options,
+        )
+        df = DataFrame.withPlan(catalog, session=self._sparkSession)
+        df.toPandas()  # Eager execution.
+        return df
+
+    createExternalTable.__doc__ = PySparkCatalog.createExternalTable.__doc__
+
+    def createTable(
+        self,
+        tableName: str,
+        path: Optional[str] = None,
+        source: Optional[str] = None,
+        schema: Optional[StructType] = None,
+        description: Optional[str] = None,
+        **options: str,
+    ) -> DataFrame:
+        catalog = plan.CreateTable(
+            table_name=tableName,
+            path=path,  # type: ignore[arg-type]
+            source=source,
+            schema=schema,
+            description=description,
+            options=options,
+        )
+        df = DataFrame.withPlan(catalog, session=self._sparkSession)
+        df.toPandas()  # Eager execution.
+        return df
+
+    createTable.__doc__ = PySparkCatalog.createTable.__doc__
+
+    def dropTempView(self, viewName: str) -> bool:
+        pdf = self._catalog_to_pandas(plan.DropTempView(view_name=viewName))
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    dropTempView.__doc__ = PySparkCatalog.dropTempView.__doc__
+
+    def dropGlobalTempView(self, viewName: str) -> bool:
+        pdf = 
self._catalog_to_pandas(plan.DropGlobalTempView(view_name=viewName))
+        assert pdf is not None
+        return pdf.iloc[0].iloc[0]
+
+    dropGlobalTempView.__doc__ = PySparkCatalog.dropGlobalTempView.__doc__
+
+    # TODO(SPARK-41612): Support Catalog.isCached
+    # def isCached(self, tableName: str) -> bool:
+    #     pdf = self._catalog_to_pandas(plan.IsCached(table_name=tableName))
+    #     assert pdf is not None
+    #     return pdf.iloc[0].iloc[0]
+    #
+    # isCached.__doc__ = PySparkCatalog.isCached.__doc__
+    #
+    # TODO(SPARK-41600): Support Catalog.cacheTable
+    # def cacheTable(self, tableName: str) -> None:
+    #     self._catalog_to_pandas(plan.CacheTable(table_name=tableName))
+    #
+    # cacheTable.__doc__ = PySparkCatalog.cacheTable.__doc__
+    #
+    # TODO(SPARK-41623): Support Catalog.uncacheTable
+    # def uncacheTable(self, tableName: str) -> None:
+    #     self._catalog_to_pandas(plan.UncacheTable(table_name=tableName))
+    #
+    # uncacheTable.__doc__ = PySparkCatalog.uncacheTable.__doc__
+
+    def clearCache(self) -> None:
+        self._catalog_to_pandas(plan.ClearCache())
+
+    clearCache.__doc__ = PySparkCatalog.clearCache.__doc__
+
+    def refreshTable(self, tableName: str) -> None:
+        self._catalog_to_pandas(plan.RefreshTable(table_name=tableName))
+
+    refreshTable.__doc__ = PySparkCatalog.refreshTable.__doc__
+
+    def recoverPartitions(self, tableName: str) -> None:
+        self._catalog_to_pandas(plan.RecoverPartitions(table_name=tableName))
+
+    recoverPartitions.__doc__ = PySparkCatalog.recoverPartitions.__doc__
+
+    def refreshByPath(self, path: str) -> None:
+        self._catalog_to_pandas(plan.RefreshByPath(path=path))
+
+    refreshByPath.__doc__ = PySparkCatalog.refreshByPath.__doc__
diff --git a/python/pyspark/sql/connect/plan.py 
b/python/pyspark/sql/connect/plan.py
index fe2105d7d2c..468d028cee2 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -95,6 +95,7 @@ class LogicalPlan(object):
 
         return plan
 
+    # TODO(SPARK-41717): Implement the command logic for print and _repr_html_
     def print(self, indent: int = 0) -> str:
         ...
 
@@ -1569,3 +1570,785 @@ class WriteOperation(LogicalPlan):
             f"</li></ul>"
         )
         pass
+
+
+class CurrentDatabase(LogicalPlan):
+    def __init__(self) -> None:
+        super().__init__(None)
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        return proto.Relation(current_database=proto.CurrentDatabase())
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__}>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b>
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class SetCurrentDatabase(LogicalPlan):
+    def __init__(self, db_name: str) -> None:
+        super().__init__(None)
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.set_current_database.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class ListDatabases(LogicalPlan):
+    def __init__(self) -> None:
+        super().__init__(None)
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        return proto.Relation(list_databases=proto.ListDatabases())
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__}>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b>
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class ListTables(LogicalPlan):
+    def __init__(self, db_name: Optional[str] = None) -> None:
+        super().__init__(None)
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        if self._db_name is not None:
+            plan.list_tables.db_name = self._db_name
+        else:
+            plan = proto.Relation(list_tables=proto.ListTables())
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class ListFunctions(LogicalPlan):
+    def __init__(self, db_name: Optional[str] = None) -> None:
+        super().__init__(None)
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        if self._db_name is not None:
+            plan = proto.Relation()
+            plan.list_functions.db_name = self._db_name
+        else:
+            plan = proto.Relation(list_functions=proto.ListFunctions())
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class ListColumns(LogicalPlan):
+    def __init__(self, table_name: str, db_name: Optional[str] = None) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.list_columns.table_name = self._table_name
+        if self._db_name is not None:
+            plan.list_columns.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"table_name='{self._table_name}' "
+            f"db_name='{self._db_name}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class GetDatabase(LogicalPlan):
+    def __init__(self, db_name: str) -> None:
+        super().__init__(None)
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.get_database.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class GetTable(LogicalPlan):
+    def __init__(self, table_name: str, db_name: Optional[str] = None) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.get_table.table_name = self._table_name
+        if self._db_name is not None:
+            plan.get_table.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"table_name='{self._table_name}' "
+            f"db_name='{self._db_name}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class GetFunction(LogicalPlan):
+    def __init__(self, function_name: str, db_name: Optional[str] = None) -> 
None:
+        super().__init__(None)
+        self._function_name = function_name
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.get_function.function_name = self._function_name
+        if self._db_name is not None:
+            plan.get_function.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"function_name='{self._function_name}' "
+            f"db_name='{self._db_name}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              function_name: {self._function_name} <br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class DatabaseExists(LogicalPlan):
+    def __init__(self, db_name: str) -> None:
+        super().__init__(None)
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.database_exists.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} db_name='{self._db_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class TableExists(LogicalPlan):
+    def __init__(self, table_name: str, db_name: Optional[str] = None) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.table_exists.table_name = self._table_name
+        if self._db_name is not None:
+            plan.table_exists.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"table_name='{self._table_name}' "
+            f"db_name='{self._db_name}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class FunctionExists(LogicalPlan):
+    def __init__(self, function_name: str, db_name: Optional[str] = None) -> 
None:
+        super().__init__(None)
+        self._function_name = function_name
+        self._db_name = db_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.function_exists.function_name = self._function_name
+        if self._db_name is not None:
+            plan.function_exists.db_name = self._db_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"function_name='{self._function_name}' "
+            f"db_name='{self._db_name}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              function_name: {self._function_name} <br />
+              db_name: {self._db_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class CreateExternalTable(LogicalPlan):
+    def __init__(
+        self,
+        table_name: str,
+        path: str,
+        source: Optional[str] = None,
+        schema: Optional[DataType] = None,
+        options: Mapping[str, str] = {},
+    ) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+        self._path = path
+        self._source = source
+        self._schema = schema
+        self._options = options
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.create_external_table.table_name = self._table_name
+        if self._path is not None:
+            plan.create_external_table.path = self._path
+        if self._source is not None:
+            plan.create_external_table.source = self._source
+        if self._schema is not None:
+            
plan.create_external_table.schema.CopyFrom(pyspark_types_to_proto_types(self._schema))
+        for k in self._options.keys():
+            v = self._options.get(k)
+            if v is not None:
+                plan.create_external_table.options[k] = v
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"table_name='{self._table_name}' "
+            f"path='{self._path}' "
+            f"source='{self._source}' "
+            f"schema='{self._schema}' "
+            f"options='{self._options}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+              path: {self._path} <br />
+              source: {self._source} <br />
+              schema: {self._schema} <br />
+              options: {self._options} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class CreateTable(LogicalPlan):
+    def __init__(
+        self,
+        table_name: str,
+        path: str,
+        source: Optional[str] = None,
+        description: Optional[str] = None,
+        schema: Optional[DataType] = None,
+        options: Mapping[str, str] = {},
+    ) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+        self._path = path
+        self._source = source
+        self._description = description
+        self._schema = schema
+        self._options = options
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.create_table.table_name = self._table_name
+        if self._path is not None:
+            plan.create_table.path = self._path
+        if self._source is not None:
+            plan.create_table.source = self._source
+        if self._description is not None:
+            plan.create_table.description = self._description
+        if self._schema is not None:
+            
plan.create_table.schema.CopyFrom(pyspark_types_to_proto_types(self._schema))
+        for k in self._options.keys():
+            v = self._options.get(k)
+            if v is not None:
+                plan.create_table.options[k] = v
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return (
+            f"{i}"
+            f"<{self.__class__.__name__}"
+            f"table_name='{self._table_name}' "
+            f"path='{self._path}' "
+            f"source='{self._source}' "
+            f"description='{self._description}' "
+            f"schema='{self._schema}' "
+            f"options='{self._options}'>"
+        )
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+              path: {self._path} <br />
+              source: {self._source} <br />
+              description: {self._description} <br />
+              schema: {self._schema} <br />
+              options: {self._options} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class DropTempView(LogicalPlan):
+    def __init__(self, view_name: str) -> None:
+        super().__init__(None)
+        self._view_name = view_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.drop_temp_view.view_name = self._view_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} 
view_name='{self._view_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              view_name: {self._view_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class DropGlobalTempView(LogicalPlan):
+    def __init__(self, view_name: str) -> None:
+        super().__init__(None)
+        self._view_name = view_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.drop_global_temp_view.view_name = self._view_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} 
view_name='{self._view_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              view_name: {self._view_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class RecoverPartitions(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.recover_partitions.table_name = self._table_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} 
table_name='{self._table_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+# TODO(SPARK-41612): Support Catalog.isCached
+# class IsCached(LogicalPlan):
+#     def __init__(self, table_name: str) -> None:
+#         super().__init__(None)
+#         self._table_name = table_name
+#
+#     def plan(self, session: "SparkConnectClient") -> proto.Relation:
+#         plan = proto.Relation()
+#         plan.is_cached.table_name = self._table_name
+#         return plan
+#
+#     def print(self, indent: int = 0) -> str:
+#         i = " " * indent
+#         return f"{i}" f"<{self.__class__.__name__} 
table_name='{self._table_name}'>"
+#
+#     def _repr_html_(self) -> str:
+#         return f"""
+#         <ul>
+#            <li>
+#               <b>{self.__class__.__name__}</b><br />
+#               table_name: {self._table_name} <br />
+#             {self._child_repr_()}
+#            </li>
+#         </ul>
+#         """
+#
+#
+# TODO(SPARK-41600): Support Catalog.cacheTable
+# class CacheTable(LogicalPlan):
+#     def __init__(self, table_name: str) -> None:
+#         super().__init__(None)
+#         self._table_name = table_name
+#
+#     def plan(self, session: "SparkConnectClient") -> proto.Relation:
+#         plan = proto.Relation()
+#         plan.cache_table.table_name = self._table_name
+#         return plan
+#
+#     def print(self, indent: int = 0) -> str:
+#         i = " " * indent
+#         return f"{i}" f"<{self.__class__.__name__} 
table_name='{self._table_name}'>"
+#
+#     def _repr_html_(self) -> str:
+#         return f"""
+#         <ul>
+#            <li>
+#               <b>{self.__class__.__name__}</b><br />
+#               table_name: {self._table_name} <br />
+#             {self._child_repr_()}
+#            </li>
+#         </ul>
+#         """
+#
+#
+# TODO(SPARK-41623): Support Catalog.uncacheTable
+# class UncacheTable(LogicalPlan):
+#     def __init__(self, table_name: str) -> None:
+#         super().__init__(None)
+#         self._table_name = table_name
+#
+#     def plan(self, session: "SparkConnectClient") -> proto.Relation:
+#         plan = proto.Relation()
+#         plan.uncache_table.table_name = self._table_name
+#         return plan
+#
+#     def print(self, indent: int = 0) -> str:
+#         i = " " * indent
+#         return f"{i}" f"<{self.__class__.__name__} 
table_name='{self._table_name}'>"
+#
+#     def _repr_html_(self) -> str:
+#         return f"""
+#         <ul>
+#            <li>
+#               <b>{self.__class__.__name__}</b><br />
+#               table_name: {self._table_name} <br />
+#             {self._child_repr_()}
+#            </li>
+#         </ul>
+#         """
+
+
+class ClearCache(LogicalPlan):
+    def __init__(self) -> None:
+        super().__init__(None)
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        return proto.Relation(clear_cache=proto.ClearCache())
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__}>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b>
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class RefreshTable(LogicalPlan):
+    def __init__(self, table_name: str) -> None:
+        super().__init__(None)
+        self._table_name = table_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.refresh_table.table_name = self._table_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} 
table_name='{self._table_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              table_name: {self._table_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class RefreshByPath(LogicalPlan):
+    def __init__(self, path: str) -> None:
+        super().__init__(None)
+        self._path = path
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.refresh_by_path.path = self._path
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} path='{self._path}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              db_name: {self._path} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class CurrentCatalog(LogicalPlan):
+    def __init__(self) -> None:
+        super().__init__(None)
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        return proto.Relation(current_catalog=proto.CurrentCatalog())
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__}>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b>
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class SetCurrentCatalog(LogicalPlan):
+    def __init__(self, catalog_name: str) -> None:
+        super().__init__(None)
+        self._catalog_name = catalog_name
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        plan = proto.Relation()
+        plan.set_current_catalog.catalog_name = self._catalog_name
+        return plan
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__} 
catalog_name='{self._catalog_name}'>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b><br />
+              catalog_name: {self._catalog_name} <br />
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
+
+
+class ListCatalogs(LogicalPlan):
+    def __init__(self) -> None:
+        super().__init__(None)
+
+    def plan(self, session: "SparkConnectClient") -> proto.Relation:
+        return proto.Relation(list_catalogs=proto.ListCatalogs())
+
+    def print(self, indent: int = 0) -> str:
+        i = " " * indent
+        return f"{i}" f"<{self.__class__.__name__}>"
+
+    def _repr_html_(self) -> str:
+        return f"""
+        <ul>
+           <li>
+              <b>{self.__class__.__name__}</b>
+            {self._child_repr_()}
+           </li>
+        </ul>
+        """
diff --git a/python/pyspark/sql/connect/proto/__init__.py 
b/python/pyspark/sql/connect/proto/__init__.py
index f00b1a74c1d..a651a844e15 100644
--- a/python/pyspark/sql/connect/proto/__init__.py
+++ b/python/pyspark/sql/connect/proto/__init__.py
@@ -21,3 +21,4 @@ from pyspark.sql.connect.proto.types_pb2 import *
 from pyspark.sql.connect.proto.commands_pb2 import *
 from pyspark.sql.connect.proto.expressions_pb2 import *
 from pyspark.sql.connect.proto.relations_pb2 import *
+from pyspark.sql.connect.proto.catalog_pb2 import *
diff --git a/python/pyspark/sql/connect/proto/catalog_pb2.py 
b/python/pyspark/sql/connect/proto/catalog_pb2.py
new file mode 100644
index 00000000000..65b3e4e584e
--- /dev/null
+++ b/python/pyspark/sql/connect/proto/catalog_pb2.py
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: spark/connect/catalog.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
+    
b'\n\x1bspark/connect/catalog.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"\x11\n\x0f\x43urrentDatabase"-\n\x12SetCurrentDatabase\x12\x17\n\x07\x64\x62_name\x18\x01
 
\x01(\tR\x06\x64\x62Name"\x0f\n\rListDatabases"6\n\nListTables\x12\x1c\n\x07\x64\x62_name\x18\x01
 
\x01(\tH\x00R\x06\x64\x62Name\x88\x01\x01\x42\n\n\x08_db_name"9\n\rListFunctions\x12\x1c\n\x07\x64\x62_name\x18\x01
 
\x01(\tH\x00R\x06\x64\x62Name\x88\x01\x01\x42\n\n\x08_db_name"V\n\x0bListColumns\x12\x1d\n\ntabl
 [...]
+)
+
+
+_CURRENTDATABASE = DESCRIPTOR.message_types_by_name["CurrentDatabase"]
+_SETCURRENTDATABASE = DESCRIPTOR.message_types_by_name["SetCurrentDatabase"]
+_LISTDATABASES = DESCRIPTOR.message_types_by_name["ListDatabases"]
+_LISTTABLES = DESCRIPTOR.message_types_by_name["ListTables"]
+_LISTFUNCTIONS = DESCRIPTOR.message_types_by_name["ListFunctions"]
+_LISTCOLUMNS = DESCRIPTOR.message_types_by_name["ListColumns"]
+_GETDATABASE = DESCRIPTOR.message_types_by_name["GetDatabase"]
+_GETTABLE = DESCRIPTOR.message_types_by_name["GetTable"]
+_GETFUNCTION = DESCRIPTOR.message_types_by_name["GetFunction"]
+_DATABASEEXISTS = DESCRIPTOR.message_types_by_name["DatabaseExists"]
+_TABLEEXISTS = DESCRIPTOR.message_types_by_name["TableExists"]
+_FUNCTIONEXISTS = DESCRIPTOR.message_types_by_name["FunctionExists"]
+_CREATEEXTERNALTABLE = DESCRIPTOR.message_types_by_name["CreateExternalTable"]
+_CREATEEXTERNALTABLE_OPTIONSENTRY = 
_CREATEEXTERNALTABLE.nested_types_by_name["OptionsEntry"]
+_CREATETABLE = DESCRIPTOR.message_types_by_name["CreateTable"]
+_CREATETABLE_OPTIONSENTRY = _CREATETABLE.nested_types_by_name["OptionsEntry"]
+_DROPTEMPVIEW = DESCRIPTOR.message_types_by_name["DropTempView"]
+_DROPGLOBALTEMPVIEW = DESCRIPTOR.message_types_by_name["DropGlobalTempView"]
+_RECOVERPARTITIONS = DESCRIPTOR.message_types_by_name["RecoverPartitions"]
+_CLEARCACHE = DESCRIPTOR.message_types_by_name["ClearCache"]
+_REFRESHTABLE = DESCRIPTOR.message_types_by_name["RefreshTable"]
+_REFRESHBYPATH = DESCRIPTOR.message_types_by_name["RefreshByPath"]
+_CURRENTCATALOG = DESCRIPTOR.message_types_by_name["CurrentCatalog"]
+_SETCURRENTCATALOG = DESCRIPTOR.message_types_by_name["SetCurrentCatalog"]
+_LISTCATALOGS = DESCRIPTOR.message_types_by_name["ListCatalogs"]
+CurrentDatabase = _reflection.GeneratedProtocolMessageType(
+    "CurrentDatabase",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _CURRENTDATABASE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.CurrentDatabase)
+    },
+)
+_sym_db.RegisterMessage(CurrentDatabase)
+
+SetCurrentDatabase = _reflection.GeneratedProtocolMessageType(
+    "SetCurrentDatabase",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _SETCURRENTDATABASE,
+        "__module__": "spark.connect.catalog_pb2"
+        # 
@@protoc_insertion_point(class_scope:spark.connect.SetCurrentDatabase)
+    },
+)
+_sym_db.RegisterMessage(SetCurrentDatabase)
+
+ListDatabases = _reflection.GeneratedProtocolMessageType(
+    "ListDatabases",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _LISTDATABASES,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.ListDatabases)
+    },
+)
+_sym_db.RegisterMessage(ListDatabases)
+
+ListTables = _reflection.GeneratedProtocolMessageType(
+    "ListTables",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _LISTTABLES,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.ListTables)
+    },
+)
+_sym_db.RegisterMessage(ListTables)
+
+ListFunctions = _reflection.GeneratedProtocolMessageType(
+    "ListFunctions",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _LISTFUNCTIONS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.ListFunctions)
+    },
+)
+_sym_db.RegisterMessage(ListFunctions)
+
+ListColumns = _reflection.GeneratedProtocolMessageType(
+    "ListColumns",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _LISTCOLUMNS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.ListColumns)
+    },
+)
+_sym_db.RegisterMessage(ListColumns)
+
+GetDatabase = _reflection.GeneratedProtocolMessageType(
+    "GetDatabase",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _GETDATABASE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.GetDatabase)
+    },
+)
+_sym_db.RegisterMessage(GetDatabase)
+
+GetTable = _reflection.GeneratedProtocolMessageType(
+    "GetTable",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _GETTABLE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.GetTable)
+    },
+)
+_sym_db.RegisterMessage(GetTable)
+
+GetFunction = _reflection.GeneratedProtocolMessageType(
+    "GetFunction",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _GETFUNCTION,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.GetFunction)
+    },
+)
+_sym_db.RegisterMessage(GetFunction)
+
+DatabaseExists = _reflection.GeneratedProtocolMessageType(
+    "DatabaseExists",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _DATABASEEXISTS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.DatabaseExists)
+    },
+)
+_sym_db.RegisterMessage(DatabaseExists)
+
+TableExists = _reflection.GeneratedProtocolMessageType(
+    "TableExists",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _TABLEEXISTS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.TableExists)
+    },
+)
+_sym_db.RegisterMessage(TableExists)
+
+FunctionExists = _reflection.GeneratedProtocolMessageType(
+    "FunctionExists",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _FUNCTIONEXISTS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.FunctionExists)
+    },
+)
+_sym_db.RegisterMessage(FunctionExists)
+
+CreateExternalTable = _reflection.GeneratedProtocolMessageType(
+    "CreateExternalTable",
+    (_message.Message,),
+    {
+        "OptionsEntry": _reflection.GeneratedProtocolMessageType(
+            "OptionsEntry",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _CREATEEXTERNALTABLE_OPTIONSENTRY,
+                "__module__": "spark.connect.catalog_pb2"
+                # 
@@protoc_insertion_point(class_scope:spark.connect.CreateExternalTable.OptionsEntry)
+            },
+        ),
+        "DESCRIPTOR": _CREATEEXTERNALTABLE,
+        "__module__": "spark.connect.catalog_pb2"
+        # 
@@protoc_insertion_point(class_scope:spark.connect.CreateExternalTable)
+    },
+)
+_sym_db.RegisterMessage(CreateExternalTable)
+_sym_db.RegisterMessage(CreateExternalTable.OptionsEntry)
+
+CreateTable = _reflection.GeneratedProtocolMessageType(
+    "CreateTable",
+    (_message.Message,),
+    {
+        "OptionsEntry": _reflection.GeneratedProtocolMessageType(
+            "OptionsEntry",
+            (_message.Message,),
+            {
+                "DESCRIPTOR": _CREATETABLE_OPTIONSENTRY,
+                "__module__": "spark.connect.catalog_pb2"
+                # 
@@protoc_insertion_point(class_scope:spark.connect.CreateTable.OptionsEntry)
+            },
+        ),
+        "DESCRIPTOR": _CREATETABLE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.CreateTable)
+    },
+)
+_sym_db.RegisterMessage(CreateTable)
+_sym_db.RegisterMessage(CreateTable.OptionsEntry)
+
+DropTempView = _reflection.GeneratedProtocolMessageType(
+    "DropTempView",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _DROPTEMPVIEW,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.DropTempView)
+    },
+)
+_sym_db.RegisterMessage(DropTempView)
+
+DropGlobalTempView = _reflection.GeneratedProtocolMessageType(
+    "DropGlobalTempView",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _DROPGLOBALTEMPVIEW,
+        "__module__": "spark.connect.catalog_pb2"
+        # 
@@protoc_insertion_point(class_scope:spark.connect.DropGlobalTempView)
+    },
+)
+_sym_db.RegisterMessage(DropGlobalTempView)
+
+RecoverPartitions = _reflection.GeneratedProtocolMessageType(
+    "RecoverPartitions",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _RECOVERPARTITIONS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.RecoverPartitions)
+    },
+)
+_sym_db.RegisterMessage(RecoverPartitions)
+
+ClearCache = _reflection.GeneratedProtocolMessageType(
+    "ClearCache",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _CLEARCACHE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.ClearCache)
+    },
+)
+_sym_db.RegisterMessage(ClearCache)
+
+RefreshTable = _reflection.GeneratedProtocolMessageType(
+    "RefreshTable",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _REFRESHTABLE,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.RefreshTable)
+    },
+)
+_sym_db.RegisterMessage(RefreshTable)
+
+RefreshByPath = _reflection.GeneratedProtocolMessageType(
+    "RefreshByPath",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _REFRESHBYPATH,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.RefreshByPath)
+    },
+)
+_sym_db.RegisterMessage(RefreshByPath)
+
+CurrentCatalog = _reflection.GeneratedProtocolMessageType(
+    "CurrentCatalog",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _CURRENTCATALOG,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.CurrentCatalog)
+    },
+)
+_sym_db.RegisterMessage(CurrentCatalog)
+
+SetCurrentCatalog = _reflection.GeneratedProtocolMessageType(
+    "SetCurrentCatalog",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _SETCURRENTCATALOG,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.SetCurrentCatalog)
+    },
+)
+_sym_db.RegisterMessage(SetCurrentCatalog)
+
+ListCatalogs = _reflection.GeneratedProtocolMessageType(
+    "ListCatalogs",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _LISTCATALOGS,
+        "__module__": "spark.connect.catalog_pb2"
+        # @@protoc_insertion_point(class_scope:spark.connect.ListCatalogs)
+    },
+)
+_sym_db.RegisterMessage(ListCatalogs)
+
+if _descriptor._USE_C_DESCRIPTORS == False:
+
+    DESCRIPTOR._options = None
+    DESCRIPTOR._serialized_options = 
b"\n\036org.apache.spark.connect.protoP\001"
+    _CREATEEXTERNALTABLE_OPTIONSENTRY._options = None
+    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_options = b"8\001"
+    _CREATETABLE_OPTIONSENTRY._options = None
+    _CREATETABLE_OPTIONSENTRY._serialized_options = b"8\001"
+    _CURRENTDATABASE._serialized_start = 73
+    _CURRENTDATABASE._serialized_end = 90
+    _SETCURRENTDATABASE._serialized_start = 92
+    _SETCURRENTDATABASE._serialized_end = 137
+    _LISTDATABASES._serialized_start = 139
+    _LISTDATABASES._serialized_end = 154
+    _LISTTABLES._serialized_start = 156
+    _LISTTABLES._serialized_end = 210
+    _LISTFUNCTIONS._serialized_start = 212
+    _LISTFUNCTIONS._serialized_end = 269
+    _LISTCOLUMNS._serialized_start = 271
+    _LISTCOLUMNS._serialized_end = 357
+    _GETDATABASE._serialized_start = 359
+    _GETDATABASE._serialized_end = 397
+    _GETTABLE._serialized_start = 399
+    _GETTABLE._serialized_end = 482
+    _GETFUNCTION._serialized_start = 484
+    _GETFUNCTION._serialized_end = 576
+    _DATABASEEXISTS._serialized_start = 578
+    _DATABASEEXISTS._serialized_end = 619
+    _TABLEEXISTS._serialized_start = 621
+    _TABLEEXISTS._serialized_end = 707
+    _FUNCTIONEXISTS._serialized_start = 709
+    _FUNCTIONEXISTS._serialized_end = 804
+    _CREATEEXTERNALTABLE._serialized_start = 807
+    _CREATEEXTERNALTABLE._serialized_end = 1133
+    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_start = 1044
+    _CREATEEXTERNALTABLE_OPTIONSENTRY._serialized_end = 1102
+    _CREATETABLE._serialized_start = 1136
+    _CREATETABLE._serialized_end = 1501
+    _CREATETABLE_OPTIONSENTRY._serialized_start = 1044
+    _CREATETABLE_OPTIONSENTRY._serialized_end = 1102
+    _DROPTEMPVIEW._serialized_start = 1503
+    _DROPTEMPVIEW._serialized_end = 1546
+    _DROPGLOBALTEMPVIEW._serialized_start = 1548
+    _DROPGLOBALTEMPVIEW._serialized_end = 1597
+    _RECOVERPARTITIONS._serialized_start = 1599
+    _RECOVERPARTITIONS._serialized_end = 1649
+    _CLEARCACHE._serialized_start = 1651
+    _CLEARCACHE._serialized_end = 1663
+    _REFRESHTABLE._serialized_start = 1665
+    _REFRESHTABLE._serialized_end = 1710
+    _REFRESHBYPATH._serialized_start = 1712
+    _REFRESHBYPATH._serialized_end = 1747
+    _CURRENTCATALOG._serialized_start = 1749
+    _CURRENTCATALOG._serialized_end = 1765
+    _SETCURRENTCATALOG._serialized_start = 1767
+    _SETCURRENTCATALOG._serialized_end = 1821
+    _LISTCATALOGS._serialized_start = 1823
+    _LISTCATALOGS._serialized_end = 1837
+# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/catalog_pb2.pyi 
b/python/pyspark/sql/connect/proto/catalog_pb2.pyi
new file mode 100644
index 00000000000..1322d41f472
--- /dev/null
+++ b/python/pyspark/sql/connect/proto/catalog_pb2.pyi
@@ -0,0 +1,722 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+@generated by mypy-protobuf.  Do not edit manually!
+isort:skip_file
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import builtins
+import collections.abc
+import google.protobuf.descriptor
+import google.protobuf.internal.containers
+import google.protobuf.message
+import pyspark.sql.connect.proto.types_pb2
+import sys
+import typing
+
+if sys.version_info >= (3, 8):
+    import typing as typing_extensions
+else:
+    import typing_extensions
+
+DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
+
+class CurrentDatabase(google.protobuf.message.Message):
+    """See `spark.catalog.currentDatabase`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    def __init__(
+        self,
+    ) -> None: ...
+
+global___CurrentDatabase = CurrentDatabase
+
+class SetCurrentDatabase(google.protobuf.message.Message):
+    """See `spark.catalog.setCurrentDatabase`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    DB_NAME_FIELD_NUMBER: builtins.int
+    db_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        db_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["db_name", 
b"db_name"]) -> None: ...
+
+global___SetCurrentDatabase = SetCurrentDatabase
+
+class ListDatabases(google.protobuf.message.Message):
+    """See `spark.catalog.listDatabases`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    def __init__(
+        self,
+    ) -> None: ...
+
+global___ListDatabases = ListDatabases
+
+class ListTables(google.protobuf.message.Message):
+    """See `spark.catalog.listTables`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    DB_NAME_FIELD_NUMBER: builtins.int
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___ListTables = ListTables
+
+class ListFunctions(google.protobuf.message.Message):
+    """See `spark.catalog.listFunctions`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    DB_NAME_FIELD_NUMBER: builtins.int
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___ListFunctions = ListFunctions
+
+class ListColumns(google.protobuf.message.Message):
+    """See `spark.catalog.listColumns`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    DB_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_db_name", b"_db_name", "db_name", b"db_name", "table_name", 
b"table_name"
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___ListColumns = ListColumns
+
+class GetDatabase(google.protobuf.message.Message):
+    """See `spark.catalog.getDatabase`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    DB_NAME_FIELD_NUMBER: builtins.int
+    db_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        db_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["db_name", 
b"db_name"]) -> None: ...
+
+global___GetDatabase = GetDatabase
+
+class GetTable(google.protobuf.message.Message):
+    """See `spark.catalog.getTable`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    DB_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_db_name", b"_db_name", "db_name", b"db_name", "table_name", 
b"table_name"
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___GetTable = GetTable
+
+class GetFunction(google.protobuf.message.Message):
+    """See `spark.catalog.getFunction`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    FUNCTION_NAME_FIELD_NUMBER: builtins.int
+    DB_NAME_FIELD_NUMBER: builtins.int
+    function_name: builtins.str
+    """(Required)"""
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        function_name: builtins.str = ...,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_db_name", b"_db_name", "db_name", b"db_name", "function_name", 
b"function_name"
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___GetFunction = GetFunction
+
+class DatabaseExists(google.protobuf.message.Message):
+    """See `spark.catalog.databaseExists`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    DB_NAME_FIELD_NUMBER: builtins.int
+    db_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        db_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["db_name", 
b"db_name"]) -> None: ...
+
+global___DatabaseExists = DatabaseExists
+
+class TableExists(google.protobuf.message.Message):
+    """See `spark.catalog.tableExists`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    DB_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_db_name", b"_db_name", "db_name", b"db_name", "table_name", 
b"table_name"
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___TableExists = TableExists
+
+class FunctionExists(google.protobuf.message.Message):
+    """See `spark.catalog.functionExists`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    FUNCTION_NAME_FIELD_NUMBER: builtins.int
+    DB_NAME_FIELD_NUMBER: builtins.int
+    function_name: builtins.str
+    """(Required)"""
+    db_name: builtins.str
+    """(Optional)"""
+    def __init__(
+        self,
+        *,
+        function_name: builtins.str = ...,
+        db_name: builtins.str | None = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["_db_name", b"_db_name", 
"db_name", b"db_name"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_db_name", b"_db_name", "db_name", b"db_name", "function_name", 
b"function_name"
+        ],
+    ) -> None: ...
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_db_name", b"_db_name"]
+    ) -> typing_extensions.Literal["db_name"] | None: ...
+
+global___FunctionExists = FunctionExists
+
+class CreateExternalTable(google.protobuf.message.Message):
+    """See `spark.catalog.createExternalTable`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    class OptionsEntry(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        KEY_FIELD_NUMBER: builtins.int
+        VALUE_FIELD_NUMBER: builtins.int
+        key: builtins.str
+        value: builtins.str
+        def __init__(
+            self,
+            *,
+            key: builtins.str = ...,
+            value: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["key", b"key", 
"value", b"value"]
+        ) -> None: ...
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    PATH_FIELD_NUMBER: builtins.int
+    SOURCE_FIELD_NUMBER: builtins.int
+    SCHEMA_FIELD_NUMBER: builtins.int
+    OPTIONS_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    path: builtins.str
+    """(Optional)"""
+    source: builtins.str
+    """(Optional)"""
+    @property
+    def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType:
+        """(Optional)"""
+    @property
+    def options(self) -> 
google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
+        """Options could be empty for valid data source format.
+        The map key is case insensitive.
+        """
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+        path: builtins.str | None = ...,
+        source: builtins.str | None = ...,
+        schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+        options: collections.abc.Mapping[builtins.str, builtins.str] | None = 
...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_path",
+            b"_path",
+            "_schema",
+            b"_schema",
+            "_source",
+            b"_source",
+            "path",
+            b"path",
+            "schema",
+            b"schema",
+            "source",
+            b"source",
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_path",
+            b"_path",
+            "_schema",
+            b"_schema",
+            "_source",
+            b"_source",
+            "options",
+            b"options",
+            "path",
+            b"path",
+            "schema",
+            b"schema",
+            "source",
+            b"source",
+            "table_name",
+            b"table_name",
+        ],
+    ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_path", b"_path"]
+    ) -> typing_extensions.Literal["path"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
+    ) -> typing_extensions.Literal["schema"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_source", b"_source"]
+    ) -> typing_extensions.Literal["source"] | None: ...
+
+global___CreateExternalTable = CreateExternalTable
+
+class CreateTable(google.protobuf.message.Message):
+    """See `spark.catalog.createTable`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    class OptionsEntry(google.protobuf.message.Message):
+        DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+        KEY_FIELD_NUMBER: builtins.int
+        VALUE_FIELD_NUMBER: builtins.int
+        key: builtins.str
+        value: builtins.str
+        def __init__(
+            self,
+            *,
+            key: builtins.str = ...,
+            value: builtins.str = ...,
+        ) -> None: ...
+        def ClearField(
+            self, field_name: typing_extensions.Literal["key", b"key", 
"value", b"value"]
+        ) -> None: ...
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    PATH_FIELD_NUMBER: builtins.int
+    SOURCE_FIELD_NUMBER: builtins.int
+    DESCRIPTION_FIELD_NUMBER: builtins.int
+    SCHEMA_FIELD_NUMBER: builtins.int
+    OPTIONS_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    path: builtins.str
+    """(Optional)"""
+    source: builtins.str
+    """(Optional)"""
+    description: builtins.str
+    """(Optional)"""
+    @property
+    def schema(self) -> pyspark.sql.connect.proto.types_pb2.DataType:
+        """(Optional)"""
+    @property
+    def options(self) -> 
google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
+        """Options could be empty for valid data source format.
+        The map key is case insensitive.
+        """
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+        path: builtins.str | None = ...,
+        source: builtins.str | None = ...,
+        description: builtins.str | None = ...,
+        schema: pyspark.sql.connect.proto.types_pb2.DataType | None = ...,
+        options: collections.abc.Mapping[builtins.str, builtins.str] | None = 
...,
+    ) -> None: ...
+    def HasField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_description",
+            b"_description",
+            "_path",
+            b"_path",
+            "_schema",
+            b"_schema",
+            "_source",
+            b"_source",
+            "description",
+            b"description",
+            "path",
+            b"path",
+            "schema",
+            b"schema",
+            "source",
+            b"source",
+        ],
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal[
+            "_description",
+            b"_description",
+            "_path",
+            b"_path",
+            "_schema",
+            b"_schema",
+            "_source",
+            b"_source",
+            "description",
+            b"description",
+            "options",
+            b"options",
+            "path",
+            b"path",
+            "schema",
+            b"schema",
+            "source",
+            b"source",
+            "table_name",
+            b"table_name",
+        ],
+    ) -> None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_description", 
b"_description"]
+    ) -> typing_extensions.Literal["description"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_path", b"_path"]
+    ) -> typing_extensions.Literal["path"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_schema", b"_schema"]
+    ) -> typing_extensions.Literal["schema"] | None: ...
+    @typing.overload
+    def WhichOneof(
+        self, oneof_group: typing_extensions.Literal["_source", b"_source"]
+    ) -> typing_extensions.Literal["source"] | None: ...
+
+global___CreateTable = CreateTable
+
+class DropTempView(google.protobuf.message.Message):
+    """See `spark.catalog.dropTempView`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    VIEW_NAME_FIELD_NUMBER: builtins.int
+    view_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        view_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["view_name", b"view_name"]
+    ) -> None: ...
+
+global___DropTempView = DropTempView
+
+class DropGlobalTempView(google.protobuf.message.Message):
+    """See `spark.catalog.dropGlobalTempView`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    VIEW_NAME_FIELD_NUMBER: builtins.int
+    view_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        view_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["view_name", b"view_name"]
+    ) -> None: ...
+
+global___DropGlobalTempView = DropGlobalTempView
+
+class RecoverPartitions(google.protobuf.message.Message):
+    """See `spark.catalog.recoverPartitions`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["table_name", 
b"table_name"]
+    ) -> None: ...
+
+global___RecoverPartitions = RecoverPartitions
+
+class ClearCache(google.protobuf.message.Message):
+    """TODO(SPARK-41612): Support Catalog.isCached
+    // See `spark.catalog.isCached`
+    message IsCached {
+     // (Required)
+     string table_name = 1;
+    }
+
+    TODO(SPARK-41600): Support Catalog.cacheTable
+    // See `spark.catalog.cacheTable`
+    message CacheTable {
+     // (Required)
+     string table_name = 1;
+    }
+
+    TODO(SPARK-41623): Support Catalog.uncacheTable
+    // See `spark.catalog.uncacheTable`
+    message UncacheTable {
+     // (Required)
+     string table_name = 1;
+    }
+
+    See `spark.catalog.clearCache`
+    """
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    def __init__(
+        self,
+    ) -> None: ...
+
+global___ClearCache = ClearCache
+
+class RefreshTable(google.protobuf.message.Message):
+    """See `spark.catalog.refreshTable`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    TABLE_NAME_FIELD_NUMBER: builtins.int
+    table_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        table_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["table_name", 
b"table_name"]
+    ) -> None: ...
+
+global___RefreshTable = RefreshTable
+
+class RefreshByPath(google.protobuf.message.Message):
+    """See `spark.catalog.refreshByPath`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    PATH_FIELD_NUMBER: builtins.int
+    path: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        path: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(self, field_name: typing_extensions.Literal["path", 
b"path"]) -> None: ...
+
+global___RefreshByPath = RefreshByPath
+
+class CurrentCatalog(google.protobuf.message.Message):
+    """See `spark.catalog.currentCatalog`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    def __init__(
+        self,
+    ) -> None: ...
+
+global___CurrentCatalog = CurrentCatalog
+
+class SetCurrentCatalog(google.protobuf.message.Message):
+    """See `spark.catalog.setCurrentCatalog`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    CATALOG_NAME_FIELD_NUMBER: builtins.int
+    catalog_name: builtins.str
+    """(Required)"""
+    def __init__(
+        self,
+        *,
+        catalog_name: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["catalog_name", 
b"catalog_name"]
+    ) -> None: ...
+
+global___SetCurrentCatalog = SetCurrentCatalog
+
+class ListCatalogs(google.protobuf.message.Message):
+    """See `spark.catalog.listCatalogs`"""
+
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    def __init__(
+        self,
+    ) -> None: ...
+
+global___ListCatalogs = ListCatalogs
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 6c9f9552e06..576a9b7ba1e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -31,10 +31,11 @@ _sym_db = _symbol_database.Default()
 
 from pyspark.sql.connect.proto import expressions_pb2 as 
spark_dot_connect_dot_expressions__pb2
 from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
+from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catalog__pb2
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto"\xf7\x0e\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
 \x01(\x0b\x32\x15.spark.connect.FilterH\x00R\x06\x66ilte [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x85\x1c\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
 \x01(\x0b\x32\x15.spa [...]
 )
 
 
@@ -525,90 +526,90 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001"
     _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._options = None
     _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_options = 
b"8\001"
-    _RELATION._serialized_start = 109
-    _RELATION._serialized_end = 2020
-    _UNKNOWN._serialized_start = 2022
-    _UNKNOWN._serialized_end = 2031
-    _RELATIONCOMMON._serialized_start = 2033
-    _RELATIONCOMMON._serialized_end = 2082
-    _SQL._serialized_start = 2084
-    _SQL._serialized_end = 2111
-    _READ._serialized_start = 2114
-    _READ._serialized_end = 2540
-    _READ_NAMEDTABLE._serialized_start = 2256
-    _READ_NAMEDTABLE._serialized_end = 2317
-    _READ_DATASOURCE._serialized_start = 2320
-    _READ_DATASOURCE._serialized_end = 2527
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 2458
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 2516
-    _PROJECT._serialized_start = 2542
-    _PROJECT._serialized_end = 2659
-    _FILTER._serialized_start = 2661
-    _FILTER._serialized_end = 2773
-    _JOIN._serialized_start = 2776
-    _JOIN._serialized_end = 3247
-    _JOIN_JOINTYPE._serialized_start = 3039
-    _JOIN_JOINTYPE._serialized_end = 3247
-    _SETOPERATION._serialized_start = 3250
-    _SETOPERATION._serialized_end = 3646
-    _SETOPERATION_SETOPTYPE._serialized_start = 3509
-    _SETOPERATION_SETOPTYPE._serialized_end = 3623
-    _LIMIT._serialized_start = 3648
-    _LIMIT._serialized_end = 3724
-    _OFFSET._serialized_start = 3726
-    _OFFSET._serialized_end = 3805
-    _TAIL._serialized_start = 3807
-    _TAIL._serialized_end = 3882
-    _AGGREGATE._serialized_start = 3885
-    _AGGREGATE._serialized_end = 4467
-    _AGGREGATE_PIVOT._serialized_start = 4224
-    _AGGREGATE_PIVOT._serialized_end = 4335
-    _AGGREGATE_GROUPTYPE._serialized_start = 4338
-    _AGGREGATE_GROUPTYPE._serialized_end = 4467
-    _SORT._serialized_start = 4470
-    _SORT._serialized_end = 4630
-    _DROP._serialized_start = 4632
-    _DROP._serialized_end = 4732
-    _DEDUPLICATE._serialized_start = 4735
-    _DEDUPLICATE._serialized_end = 4906
-    _LOCALRELATION._serialized_start = 4909
-    _LOCALRELATION._serialized_end = 5046
-    _SAMPLE._serialized_start = 5049
-    _SAMPLE._serialized_end = 5344
-    _RANGE._serialized_start = 5347
-    _RANGE._serialized_end = 5492
-    _SUBQUERYALIAS._serialized_start = 5494
-    _SUBQUERYALIAS._serialized_end = 5608
-    _REPARTITION._serialized_start = 5611
-    _REPARTITION._serialized_end = 5753
-    _SHOWSTRING._serialized_start = 5756
-    _SHOWSTRING._serialized_end = 5898
-    _STATSUMMARY._serialized_start = 5900
-    _STATSUMMARY._serialized_end = 5992
-    _STATDESCRIBE._serialized_start = 5994
-    _STATDESCRIBE._serialized_end = 6075
-    _STATCROSSTAB._serialized_start = 6077
-    _STATCROSSTAB._serialized_end = 6178
-    _NAFILL._serialized_start = 6181
-    _NAFILL._serialized_end = 6315
-    _NADROP._serialized_start = 6318
-    _NADROP._serialized_end = 6452
-    _NAREPLACE._serialized_start = 6455
-    _NAREPLACE._serialized_end = 6751
-    _NAREPLACE_REPLACEMENT._serialized_start = 6610
-    _NAREPLACE_REPLACEMENT._serialized_end = 6751
-    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 6753
-    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 6867
-    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 6870
-    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 7129
-    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 
7062
-    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 7129
-    _WITHCOLUMNS._serialized_start = 7132
-    _WITHCOLUMNS._serialized_end = 7263
-    _HINT._serialized_start = 7266
-    _HINT._serialized_end = 7406
-    _UNPIVOT._serialized_start = 7409
-    _UNPIVOT._serialized_end = 7655
-    _TOSCHEMA._serialized_start = 7657
-    _TOSCHEMA._serialized_end = 7763
+    _RELATION._serialized_start = 138
+    _RELATION._serialized_end = 3727
+    _UNKNOWN._serialized_start = 3729
+    _UNKNOWN._serialized_end = 3738
+    _RELATIONCOMMON._serialized_start = 3740
+    _RELATIONCOMMON._serialized_end = 3789
+    _SQL._serialized_start = 3791
+    _SQL._serialized_end = 3818
+    _READ._serialized_start = 3821
+    _READ._serialized_end = 4247
+    _READ_NAMEDTABLE._serialized_start = 3963
+    _READ_NAMEDTABLE._serialized_end = 4024
+    _READ_DATASOURCE._serialized_start = 4027
+    _READ_DATASOURCE._serialized_end = 4234
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4165
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4223
+    _PROJECT._serialized_start = 4249
+    _PROJECT._serialized_end = 4366
+    _FILTER._serialized_start = 4368
+    _FILTER._serialized_end = 4480
+    _JOIN._serialized_start = 4483
+    _JOIN._serialized_end = 4954
+    _JOIN_JOINTYPE._serialized_start = 4746
+    _JOIN_JOINTYPE._serialized_end = 4954
+    _SETOPERATION._serialized_start = 4957
+    _SETOPERATION._serialized_end = 5353
+    _SETOPERATION_SETOPTYPE._serialized_start = 5216
+    _SETOPERATION_SETOPTYPE._serialized_end = 5330
+    _LIMIT._serialized_start = 5355
+    _LIMIT._serialized_end = 5431
+    _OFFSET._serialized_start = 5433
+    _OFFSET._serialized_end = 5512
+    _TAIL._serialized_start = 5514
+    _TAIL._serialized_end = 5589
+    _AGGREGATE._serialized_start = 5592
+    _AGGREGATE._serialized_end = 6174
+    _AGGREGATE_PIVOT._serialized_start = 5931
+    _AGGREGATE_PIVOT._serialized_end = 6042
+    _AGGREGATE_GROUPTYPE._serialized_start = 6045
+    _AGGREGATE_GROUPTYPE._serialized_end = 6174
+    _SORT._serialized_start = 6177
+    _SORT._serialized_end = 6337
+    _DROP._serialized_start = 6339
+    _DROP._serialized_end = 6439
+    _DEDUPLICATE._serialized_start = 6442
+    _DEDUPLICATE._serialized_end = 6613
+    _LOCALRELATION._serialized_start = 6616
+    _LOCALRELATION._serialized_end = 6753
+    _SAMPLE._serialized_start = 6756
+    _SAMPLE._serialized_end = 7051
+    _RANGE._serialized_start = 7054
+    _RANGE._serialized_end = 7199
+    _SUBQUERYALIAS._serialized_start = 7201
+    _SUBQUERYALIAS._serialized_end = 7315
+    _REPARTITION._serialized_start = 7318
+    _REPARTITION._serialized_end = 7460
+    _SHOWSTRING._serialized_start = 7463
+    _SHOWSTRING._serialized_end = 7605
+    _STATSUMMARY._serialized_start = 7607
+    _STATSUMMARY._serialized_end = 7699
+    _STATDESCRIBE._serialized_start = 7701
+    _STATDESCRIBE._serialized_end = 7782
+    _STATCROSSTAB._serialized_start = 7784
+    _STATCROSSTAB._serialized_end = 7885
+    _NAFILL._serialized_start = 7888
+    _NAFILL._serialized_end = 8022
+    _NADROP._serialized_start = 8025
+    _NADROP._serialized_end = 8159
+    _NAREPLACE._serialized_start = 8162
+    _NAREPLACE._serialized_end = 8458
+    _NAREPLACE_REPLACEMENT._serialized_start = 8317
+    _NAREPLACE_REPLACEMENT._serialized_end = 8458
+    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 8460
+    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 8574
+    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 8577
+    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 8836
+    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 
8769
+    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 8836
+    _WITHCOLUMNS._serialized_start = 8839
+    _WITHCOLUMNS._serialized_end = 8970
+    _HINT._serialized_start = 8973
+    _HINT._serialized_end = 9113
+    _UNPIVOT._serialized_start = 9116
+    _UNPIVOT._serialized_end = 9362
+    _TOSCHEMA._serialized_start = 9364
+    _TOSCHEMA._serialized_end = 9470
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index 57f1177d885..22a0c778a3b 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -39,6 +39,7 @@ import google.protobuf.descriptor
 import google.protobuf.internal.containers
 import google.protobuf.internal.enum_type_wrapper
 import google.protobuf.message
+import pyspark.sql.connect.proto.catalog_pb2
 import pyspark.sql.connect.proto.expressions_pb2
 import pyspark.sql.connect.proto.types_pb2
 import sys
@@ -92,6 +93,29 @@ class Relation(google.protobuf.message.Message):
     SUMMARY_FIELD_NUMBER: builtins.int
     CROSSTAB_FIELD_NUMBER: builtins.int
     DESCRIBE_FIELD_NUMBER: builtins.int
+    CURRENT_DATABASE_FIELD_NUMBER: builtins.int
+    SET_CURRENT_DATABASE_FIELD_NUMBER: builtins.int
+    LIST_DATABASES_FIELD_NUMBER: builtins.int
+    LIST_TABLES_FIELD_NUMBER: builtins.int
+    LIST_FUNCTIONS_FIELD_NUMBER: builtins.int
+    LIST_COLUMNS_FIELD_NUMBER: builtins.int
+    GET_DATABASE_FIELD_NUMBER: builtins.int
+    GET_TABLE_FIELD_NUMBER: builtins.int
+    GET_FUNCTION_FIELD_NUMBER: builtins.int
+    DATABASE_EXISTS_FIELD_NUMBER: builtins.int
+    TABLE_EXISTS_FIELD_NUMBER: builtins.int
+    FUNCTION_EXISTS_FIELD_NUMBER: builtins.int
+    CREATE_EXTERNAL_TABLE_FIELD_NUMBER: builtins.int
+    CREATE_TABLE_FIELD_NUMBER: builtins.int
+    DROP_TEMP_VIEW_FIELD_NUMBER: builtins.int
+    DROP_GLOBAL_TEMP_VIEW_FIELD_NUMBER: builtins.int
+    RECOVER_PARTITIONS_FIELD_NUMBER: builtins.int
+    CLEAR_CACHE_FIELD_NUMBER: builtins.int
+    REFRESH_TABLE_FIELD_NUMBER: builtins.int
+    REFRESH_BY_PATH_FIELD_NUMBER: builtins.int
+    CURRENT_CATALOG_FIELD_NUMBER: builtins.int
+    SET_CURRENT_CATALOG_FIELD_NUMBER: builtins.int
+    LIST_CATALOGS_FIELD_NUMBER: builtins.int
     UNKNOWN_FIELD_NUMBER: builtins.int
     @property
     def common(self) -> global___RelationCommon: ...
@@ -160,6 +184,62 @@ class Relation(google.protobuf.message.Message):
     @property
     def describe(self) -> global___StatDescribe: ...
     @property
+    def current_database(self) -> 
pyspark.sql.connect.proto.catalog_pb2.CurrentDatabase:
+        """Catalog API (internal-only)"""
+    @property
+    def set_current_database(self) -> 
pyspark.sql.connect.proto.catalog_pb2.SetCurrentDatabase: ...
+    @property
+    def list_databases(self) -> 
pyspark.sql.connect.proto.catalog_pb2.ListDatabases: ...
+    @property
+    def list_tables(self) -> pyspark.sql.connect.proto.catalog_pb2.ListTables: 
...
+    @property
+    def list_functions(self) -> 
pyspark.sql.connect.proto.catalog_pb2.ListFunctions: ...
+    @property
+    def list_columns(self) -> 
pyspark.sql.connect.proto.catalog_pb2.ListColumns: ...
+    @property
+    def get_database(self) -> 
pyspark.sql.connect.proto.catalog_pb2.GetDatabase: ...
+    @property
+    def get_table(self) -> pyspark.sql.connect.proto.catalog_pb2.GetTable: ...
+    @property
+    def get_function(self) -> 
pyspark.sql.connect.proto.catalog_pb2.GetFunction: ...
+    @property
+    def database_exists(self) -> 
pyspark.sql.connect.proto.catalog_pb2.DatabaseExists: ...
+    @property
+    def table_exists(self) -> 
pyspark.sql.connect.proto.catalog_pb2.TableExists: ...
+    @property
+    def function_exists(self) -> 
pyspark.sql.connect.proto.catalog_pb2.FunctionExists: ...
+    @property
+    def create_external_table(
+        self,
+    ) -> pyspark.sql.connect.proto.catalog_pb2.CreateExternalTable: ...
+    @property
+    def create_table(self) -> 
pyspark.sql.connect.proto.catalog_pb2.CreateTable: ...
+    @property
+    def drop_temp_view(self) -> 
pyspark.sql.connect.proto.catalog_pb2.DropTempView: ...
+    @property
+    def drop_global_temp_view(self) -> 
pyspark.sql.connect.proto.catalog_pb2.DropGlobalTempView: ...
+    @property
+    def recover_partitions(self) -> 
pyspark.sql.connect.proto.catalog_pb2.RecoverPartitions: ...
+    @property
+    def clear_cache(self) -> pyspark.sql.connect.proto.catalog_pb2.ClearCache:
+        """TODO(SPARK-41612): Support Catalog.isCached
+           IsCached is_cached = 218;
+        TODO(SPARK-41600): Support Catalog.cacheTable
+           CacheTable cache_table = 219;
+        TODO(SPARK-41623): Support Catalog.uncacheTable
+           UncacheTable uncache_table = 220;
+        """
+    @property
+    def refresh_table(self) -> 
pyspark.sql.connect.proto.catalog_pb2.RefreshTable: ...
+    @property
+    def refresh_by_path(self) -> 
pyspark.sql.connect.proto.catalog_pb2.RefreshByPath: ...
+    @property
+    def current_catalog(self) -> 
pyspark.sql.connect.proto.catalog_pb2.CurrentCatalog: ...
+    @property
+    def set_current_catalog(self) -> 
pyspark.sql.connect.proto.catalog_pb2.SetCurrentCatalog: ...
+    @property
+    def list_catalogs(self) -> 
pyspark.sql.connect.proto.catalog_pb2.ListCatalogs: ...
+    @property
     def unknown(self) -> global___Unknown: ...
     def __init__(
         self,
@@ -196,6 +276,31 @@ class Relation(google.protobuf.message.Message):
         summary: global___StatSummary | None = ...,
         crosstab: global___StatCrosstab | None = ...,
         describe: global___StatDescribe | None = ...,
+        current_database: 
pyspark.sql.connect.proto.catalog_pb2.CurrentDatabase | None = ...,
+        set_current_database: 
pyspark.sql.connect.proto.catalog_pb2.SetCurrentDatabase | None = ...,
+        list_databases: pyspark.sql.connect.proto.catalog_pb2.ListDatabases | 
None = ...,
+        list_tables: pyspark.sql.connect.proto.catalog_pb2.ListTables | None = 
...,
+        list_functions: pyspark.sql.connect.proto.catalog_pb2.ListFunctions | 
None = ...,
+        list_columns: pyspark.sql.connect.proto.catalog_pb2.ListColumns | None 
= ...,
+        get_database: pyspark.sql.connect.proto.catalog_pb2.GetDatabase | None 
= ...,
+        get_table: pyspark.sql.connect.proto.catalog_pb2.GetTable | None = ...,
+        get_function: pyspark.sql.connect.proto.catalog_pb2.GetFunction | None 
= ...,
+        database_exists: pyspark.sql.connect.proto.catalog_pb2.DatabaseExists 
| None = ...,
+        table_exists: pyspark.sql.connect.proto.catalog_pb2.TableExists | None 
= ...,
+        function_exists: pyspark.sql.connect.proto.catalog_pb2.FunctionExists 
| None = ...,
+        create_external_table: 
pyspark.sql.connect.proto.catalog_pb2.CreateExternalTable
+        | None = ...,
+        create_table: pyspark.sql.connect.proto.catalog_pb2.CreateTable | None 
= ...,
+        drop_temp_view: pyspark.sql.connect.proto.catalog_pb2.DropTempView | 
None = ...,
+        drop_global_temp_view: 
pyspark.sql.connect.proto.catalog_pb2.DropGlobalTempView
+        | None = ...,
+        recover_partitions: 
pyspark.sql.connect.proto.catalog_pb2.RecoverPartitions | None = ...,
+        clear_cache: pyspark.sql.connect.proto.catalog_pb2.ClearCache | None = 
...,
+        refresh_table: pyspark.sql.connect.proto.catalog_pb2.RefreshTable | 
None = ...,
+        refresh_by_path: pyspark.sql.connect.proto.catalog_pb2.RefreshByPath | 
None = ...,
+        current_catalog: pyspark.sql.connect.proto.catalog_pb2.CurrentCatalog 
| None = ...,
+        set_current_catalog: 
pyspark.sql.connect.proto.catalog_pb2.SetCurrentCatalog | None = ...,
+        list_catalogs: pyspark.sql.connect.proto.catalog_pb2.ListCatalogs | 
None = ...,
         unknown: global___Unknown | None = ...,
     ) -> None: ...
     def HasField(
@@ -203,28 +308,62 @@ class Relation(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "aggregate",
             b"aggregate",
+            "clear_cache",
+            b"clear_cache",
             "common",
             b"common",
+            "create_external_table",
+            b"create_external_table",
+            "create_table",
+            b"create_table",
             "crosstab",
             b"crosstab",
+            "current_catalog",
+            b"current_catalog",
+            "current_database",
+            b"current_database",
+            "database_exists",
+            b"database_exists",
             "deduplicate",
             b"deduplicate",
             "describe",
             b"describe",
             "drop",
             b"drop",
+            "drop_global_temp_view",
+            b"drop_global_temp_view",
             "drop_na",
             b"drop_na",
+            "drop_temp_view",
+            b"drop_temp_view",
             "fill_na",
             b"fill_na",
             "filter",
             b"filter",
+            "function_exists",
+            b"function_exists",
+            "get_database",
+            b"get_database",
+            "get_function",
+            b"get_function",
+            "get_table",
+            b"get_table",
             "hint",
             b"hint",
             "join",
             b"join",
             "limit",
             b"limit",
+            "list_catalogs",
+            b"list_catalogs",
+            "list_columns",
+            b"list_columns",
+            "list_databases",
+            b"list_databases",
+            "list_functions",
+            b"list_functions",
+            "list_tables",
+            b"list_tables",
             "local_relation",
             b"local_relation",
             "offset",
@@ -235,6 +374,12 @@ class Relation(google.protobuf.message.Message):
             b"range",
             "read",
             b"read",
+            "recover_partitions",
+            b"recover_partitions",
+            "refresh_by_path",
+            b"refresh_by_path",
+            "refresh_table",
+            b"refresh_table",
             "rel_type",
             b"rel_type",
             "rename_columns_by_name_to_name_map",
@@ -247,6 +392,10 @@ class Relation(google.protobuf.message.Message):
             b"replace",
             "sample",
             b"sample",
+            "set_current_catalog",
+            b"set_current_catalog",
+            "set_current_database",
+            b"set_current_database",
             "set_op",
             b"set_op",
             "show_string",
@@ -259,6 +408,8 @@ class Relation(google.protobuf.message.Message):
             b"subquery_alias",
             "summary",
             b"summary",
+            "table_exists",
+            b"table_exists",
             "tail",
             b"tail",
             "to_schema",
@@ -276,28 +427,62 @@ class Relation(google.protobuf.message.Message):
         field_name: typing_extensions.Literal[
             "aggregate",
             b"aggregate",
+            "clear_cache",
+            b"clear_cache",
             "common",
             b"common",
+            "create_external_table",
+            b"create_external_table",
+            "create_table",
+            b"create_table",
             "crosstab",
             b"crosstab",
+            "current_catalog",
+            b"current_catalog",
+            "current_database",
+            b"current_database",
+            "database_exists",
+            b"database_exists",
             "deduplicate",
             b"deduplicate",
             "describe",
             b"describe",
             "drop",
             b"drop",
+            "drop_global_temp_view",
+            b"drop_global_temp_view",
             "drop_na",
             b"drop_na",
+            "drop_temp_view",
+            b"drop_temp_view",
             "fill_na",
             b"fill_na",
             "filter",
             b"filter",
+            "function_exists",
+            b"function_exists",
+            "get_database",
+            b"get_database",
+            "get_function",
+            b"get_function",
+            "get_table",
+            b"get_table",
             "hint",
             b"hint",
             "join",
             b"join",
             "limit",
             b"limit",
+            "list_catalogs",
+            b"list_catalogs",
+            "list_columns",
+            b"list_columns",
+            "list_databases",
+            b"list_databases",
+            "list_functions",
+            b"list_functions",
+            "list_tables",
+            b"list_tables",
             "local_relation",
             b"local_relation",
             "offset",
@@ -308,6 +493,12 @@ class Relation(google.protobuf.message.Message):
             b"range",
             "read",
             b"read",
+            "recover_partitions",
+            b"recover_partitions",
+            "refresh_by_path",
+            b"refresh_by_path",
+            "refresh_table",
+            b"refresh_table",
             "rel_type",
             b"rel_type",
             "rename_columns_by_name_to_name_map",
@@ -320,6 +511,10 @@ class Relation(google.protobuf.message.Message):
             b"replace",
             "sample",
             b"sample",
+            "set_current_catalog",
+            b"set_current_catalog",
+            "set_current_database",
+            b"set_current_database",
             "set_op",
             b"set_op",
             "show_string",
@@ -332,6 +527,8 @@ class Relation(google.protobuf.message.Message):
             b"subquery_alias",
             "summary",
             b"summary",
+            "table_exists",
+            b"table_exists",
             "tail",
             b"tail",
             "to_schema",
@@ -378,6 +575,29 @@ class Relation(google.protobuf.message.Message):
         "summary",
         "crosstab",
         "describe",
+        "current_database",
+        "set_current_database",
+        "list_databases",
+        "list_tables",
+        "list_functions",
+        "list_columns",
+        "get_database",
+        "get_table",
+        "get_function",
+        "database_exists",
+        "table_exists",
+        "function_exists",
+        "create_external_table",
+        "create_table",
+        "drop_temp_view",
+        "drop_global_temp_view",
+        "recover_partitions",
+        "clear_cache",
+        "refresh_table",
+        "refresh_by_path",
+        "current_catalog",
+        "set_current_catalog",
+        "list_catalogs",
         "unknown",
     ] | None: ...
 
diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index f003a1244db..a220e3824ea 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -46,6 +46,7 @@ from typing import (
 
 if TYPE_CHECKING:
     from pyspark.sql.connect._typing import OptionalPrimitiveType
+    from pyspark.sql.connect.catalog import Catalog
 
 
 class SparkSession(object):
@@ -120,6 +121,11 @@ class SparkSession(object):
         # Parse the connection string.
         self._client = SparkConnectClient(connectionString)
 
+    def table(self, tableName: str) -> DataFrame:
+        return self.read.table(tableName)
+
+    table.__doc__ = PySparkSession.table.__doc__
+
     @property
     def read(self) -> "DataFrameReader":
         return DataFrameReader(self)
@@ -224,6 +230,16 @@ class SparkSession(object):
 
     range.__doc__ = PySparkSession.__doc__
 
+    @property
+    def catalog(self) -> "Catalog":
+        from pyspark.sql.connect.catalog import Catalog
+
+        if not hasattr(self, "_catalog"):
+            self._catalog = Catalog(self)
+        return self._catalog
+
+    catalog.__doc__ = PySparkSession.__doc__
+
     # SparkConnect-specific API
     @property
     def client(self) -> "SparkConnectClient":
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index de93b34a272..7ecb7e61fb6 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -666,6 +666,9 @@ class SparkSession(SparkConversionMixin):
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Returns
         -------
         :class:`Catalog`
@@ -1411,6 +1414,9 @@ class SparkSession(SparkConversionMixin):
 
         .. versionadded:: 2.0.0
 
+        .. versionchanged:: 3.4.0
+            Support Spark Connect.
+
         Parameters
         ----------
         tableName : str
diff --git a/python/pyspark/sql/tests/connect/test_parity_catalog.py 
b/python/pyspark/sql/tests/connect/test_parity_catalog.py
new file mode 100644
index 00000000000..b832c5e1ef9
--- /dev/null
+++ b/python/pyspark/sql/tests/connect/test_parity_catalog.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import os
+
+from pyspark.sql import SparkSession
+from pyspark.sql.tests.test_catalog import CatalogTestsMixin
+from pyspark.testing.connectutils import should_test_connect, 
connect_requirement_message
+from pyspark.testing.sqlutils import ReusedSQLTestCase
+
+
[email protected](not should_test_connect, connect_requirement_message)
+class CatalogParityTests(CatalogTestsMixin, ReusedSQLTestCase):
+    @classmethod
+    def setUpClass(cls):
+        super(CatalogParityTests, cls).setUpClass()
+        cls._spark = cls.spark  # Assign existing Spark session to run the 
server
+        # Sets the remote address. Now, we create a remote Spark Session.
+        # Note that this is only allowed in testing.
+        os.environ["SPARK_REMOTE"] = "sc://localhost"
+        cls.spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
+
+    @classmethod
+    def tearDownClass(cls):
+        # TODO(SPARK-41529): Implement stop in RemoteSparkSession.
+        #  Stop the regular Spark session (server) too.
+        cls.spark = cls._spark
+        super(CatalogParityTests, cls).tearDownClass()
+        del os.environ["SPARK_REMOTE"]
+
+    # TODO(SPARK-41612): Support Catalog.isCached
+    # TODO(SPARK-41600): Support Catalog.cacheTable
+    # TODO(SPARK-41623): Support Catalog.uncacheTable
+    @unittest.skip("Fails in Spark Connect, should enable.")
+    def test_table_cache(self):
+        super().test_table_cache()
+
+    # TODO(SPARK-41600): Support Catalog.cacheTable
+    @unittest.skip("Fails in Spark Connect, should enable.")
+    def test_refresh_table(self):
+        super().test_refresh_table()
+
+
+if __name__ == "__main__":
+    import unittest
+    from pyspark.sql.tests.connect.test_parity_catalog import *  # noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/sql/tests/test_catalog.py 
b/python/pyspark/sql/tests/test_catalog.py
index 2eccfab72fd..d60d53e5877 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -16,20 +16,21 @@
 #
 
 from pyspark.sql.types import StructType, StructField, IntegerType
-from pyspark.sql.utils import AnalysisException
 from pyspark.testing.sqlutils import ReusedSQLTestCase
 
 
-class CatalogTests(ReusedSQLTestCase):
+class CatalogTestsMixin:
     def test_current_database(self):
         spark = self.spark
         with self.database("some_db"):
             self.assertEqual(spark.catalog.currentDatabase(), "default")
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             spark.catalog.setCurrentDatabase("some_db")
             self.assertEqual(spark.catalog.currentDatabase(), "some_db")
             self.assertRaisesRegex(
-                AnalysisException,
+                # TODO(SPARK-41715): Should catch specific exceptions for both
+                #  Spark Connect and PySpark
+                Exception,
                 "does_not_exist",
                 lambda: spark.catalog.setCurrentDatabase("does_not_exist"),
             )
@@ -39,7 +40,7 @@ class CatalogTests(ReusedSQLTestCase):
         with self.database("some_db"):
             databases = [db.name for db in spark.catalog.listDatabases()]
             self.assertEqual(databases, ["default"])
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             databases = [db.name for db in spark.catalog.listDatabases()]
             self.assertEqual(sorted(databases), ["default", "some_db"])
 
@@ -48,7 +49,7 @@ class CatalogTests(ReusedSQLTestCase):
         spark = self.spark
         with self.database("some_db"):
             self.assertFalse(spark.catalog.databaseExists("some_db"))
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             self.assertTrue(spark.catalog.databaseExists("some_db"))
             
self.assertTrue(spark.catalog.databaseExists("spark_catalog.some_db"))
             
self.assertFalse(spark.catalog.databaseExists("spark_catalog.some_db2"))
@@ -56,7 +57,7 @@ class CatalogTests(ReusedSQLTestCase):
     def test_get_database(self):
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             db = spark.catalog.getDatabase("spark_catalog.some_db")
             self.assertEqual(db.name, "some_db")
             self.assertEqual(db.catalog, "spark_catalog")
@@ -66,14 +67,16 @@ class CatalogTests(ReusedSQLTestCase):
 
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             with self.table("tab1", "some_db.tab2", "tab3_via_catalog"):
                 with self.tempView("temp_tab"):
                     self.assertEqual(spark.catalog.listTables(), [])
                     self.assertEqual(spark.catalog.listTables("some_db"), [])
                     spark.createDataFrame([(1, 
1)]).createOrReplaceTempView("temp_tab")
-                    spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet")
-                    spark.sql("CREATE TABLE some_db.tab2 (name STRING, age 
INT) USING parquet")
+                    spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet").collect()
+                    spark.sql(
+                        "CREATE TABLE some_db.tab2 (name STRING, age INT) 
USING parquet"
+                    ).collect()
 
                     schema = StructType([StructField("a", IntegerType(), 
True)])
                     description = "this a table created via 
Catalog.createTable()"
@@ -178,7 +181,7 @@ class CatalogTests(ReusedSQLTestCase):
                         )
                     )
                     self.assertRaisesRegex(
-                        AnalysisException,
+                        Exception,
                         "does_not_exist",
                         lambda: spark.catalog.listTables("does_not_exist"),
                     )
@@ -186,7 +189,7 @@ class CatalogTests(ReusedSQLTestCase):
     def test_list_functions(self):
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             functions = dict((f.name, f) for f in 
spark.catalog.listFunctions())
             functionsDefault = dict((f.name, f) for f in 
spark.catalog.listFunctions("default"))
             self.assertTrue(len(functions) > 200)
@@ -206,23 +209,28 @@ class CatalogTests(ReusedSQLTestCase):
             self.assertEqual(functions, functionsDefault)
 
             with self.function("func1", "some_db.func2"):
-                spark.udf.register("temp_func", lambda x: str(x))
-                spark.sql("CREATE FUNCTION func1 AS 
'org.apache.spark.data.bricks'")
-                spark.sql("CREATE FUNCTION some_db.func2 AS 
'org.apache.spark.data.bricks'")
+                if hasattr(spark, "udf"):
+                    spark.udf.register("temp_func", lambda x: str(x))
+                spark.sql("CREATE FUNCTION func1 AS 
'org.apache.spark.data.bricks'").collect()
+                spark.sql(
+                    "CREATE FUNCTION some_db.func2 AS 
'org.apache.spark.data.bricks'"
+                ).collect()
                 newFunctions = dict((f.name, f) for f in 
spark.catalog.listFunctions())
                 newFunctionsSomeDb = dict(
                     (f.name, f) for f in spark.catalog.listFunctions("some_db")
                 )
                 self.assertTrue(set(functions).issubset(set(newFunctions)))
                 
self.assertTrue(set(functions).issubset(set(newFunctionsSomeDb)))
-                self.assertTrue("temp_func" in newFunctions)
+                if hasattr(spark, "udf"):
+                    self.assertTrue("temp_func" in newFunctions)
                 self.assertTrue("func1" in newFunctions)
                 self.assertTrue("func2" not in newFunctions)
-                self.assertTrue("temp_func" in newFunctionsSomeDb)
+                if hasattr(spark, "udf"):
+                    self.assertTrue("temp_func" in newFunctionsSomeDb)
                 self.assertTrue("func1" not in newFunctionsSomeDb)
                 self.assertTrue("func2" in newFunctionsSomeDb)
                 self.assertRaisesRegex(
-                    AnalysisException,
+                    Exception,
                     "does_not_exist",
                     lambda: spark.catalog.listFunctions("does_not_exist"),
                 )
@@ -235,7 +243,7 @@ class CatalogTests(ReusedSQLTestCase):
             self.assertFalse(spark.catalog.functionExists("default.func1"))
             
self.assertFalse(spark.catalog.functionExists("spark_catalog.default.func1"))
             self.assertFalse(spark.catalog.functionExists("func1", "default"))
-            spark.sql("CREATE FUNCTION func1 AS 
'org.apache.spark.data.bricks'")
+            spark.sql("CREATE FUNCTION func1 AS 
'org.apache.spark.data.bricks'").collect()
             self.assertTrue(spark.catalog.functionExists("func1"))
             self.assertTrue(spark.catalog.functionExists("default.func1"))
             
self.assertTrue(spark.catalog.functionExists("spark_catalog.default.func1"))
@@ -244,7 +252,7 @@ class CatalogTests(ReusedSQLTestCase):
     def test_get_function(self):
         spark = self.spark
         with self.function("func1"):
-            spark.sql("CREATE FUNCTION func1 AS 
'org.apache.spark.data.bricks'")
+            spark.sql("CREATE FUNCTION func1 AS 
'org.apache.spark.data.bricks'").collect()
             func1 = spark.catalog.getFunction("spark_catalog.default.func1")
             self.assertTrue(func1.name == "func1")
             self.assertTrue(func1.namespace == ["default"])
@@ -257,12 +265,12 @@ class CatalogTests(ReusedSQLTestCase):
 
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             with self.table("tab1", "some_db.tab2"):
-                spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet")
+                spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet").collect()
                 spark.sql(
                     "CREATE TABLE some_db.tab2 (nickname STRING, tolerance 
FLOAT) USING parquet"
-                )
+                ).collect()
                 columns = sorted(
                     spark.catalog.listColumns("spark_catalog.default.tab1"), 
key=lambda c: c.name
                 )
@@ -319,11 +327,9 @@ class CatalogTests(ReusedSQLTestCase):
                         isBucket=False,
                     ),
                 )
+                self.assertRaisesRegex(Exception, "tab2", lambda: 
spark.catalog.listColumns("tab2"))
                 self.assertRaisesRegex(
-                    AnalysisException, "tab2", lambda: 
spark.catalog.listColumns("tab2")
-                )
-                self.assertRaisesRegex(
-                    AnalysisException,
+                    Exception,
                     "does_not_exist",
                     lambda: spark.catalog.listColumns("does_not_exist"),
                 )
@@ -331,9 +337,11 @@ class CatalogTests(ReusedSQLTestCase):
     def test_table_cache(self):
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             with self.table("tab1"):
-                spark.sql("CREATE TABLE some_db.tab1 (name STRING, age INT) 
USING parquet")
+                spark.sql(
+                    "CREATE TABLE some_db.tab1 (name STRING, age INT) USING 
parquet"
+                ).collect()
                 self.assertFalse(spark.catalog.isCached("some_db.tab1"))
                 
self.assertFalse(spark.catalog.isCached("spark_catalog.some_db.tab1"))
                 spark.catalog.cacheTable("spark_catalog.some_db.tab1")
@@ -347,16 +355,18 @@ class CatalogTests(ReusedSQLTestCase):
         # SPARK-36176: testing that table_exists returns correct boolean
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             with self.table("tab1", "some_db.tab2"):
                 self.assertFalse(spark.catalog.tableExists("tab1"))
                 self.assertFalse(spark.catalog.tableExists("tab2", "some_db"))
-                spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet")
+                spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet").collect()
                 self.assertTrue(spark.catalog.tableExists("tab1"))
                 self.assertTrue(spark.catalog.tableExists("default.tab1"))
                 
self.assertTrue(spark.catalog.tableExists("spark_catalog.default.tab1"))
                 self.assertTrue(spark.catalog.tableExists("tab1", "default"))
-                spark.sql("CREATE TABLE some_db.tab2 (name STRING, age INT) 
USING parquet")
+                spark.sql(
+                    "CREATE TABLE some_db.tab2 (name STRING, age INT) USING 
parquet"
+                ).collect()
                 self.assertFalse(spark.catalog.tableExists("tab2"))
                 self.assertTrue(spark.catalog.tableExists("some_db.tab2"))
                 
self.assertTrue(spark.catalog.tableExists("spark_catalog.some_db.tab2"))
@@ -365,9 +375,9 @@ class CatalogTests(ReusedSQLTestCase):
     def test_get_table(self):
         spark = self.spark
         with self.database("some_db"):
-            spark.sql("CREATE DATABASE some_db")
+            spark.sql("CREATE DATABASE some_db").collect()
             with self.table("tab1"):
-                spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet")
+                spark.sql("CREATE TABLE tab1 (name STRING, age INT) USING 
parquet").collect()
                 self.assertEqual(spark.catalog.getTable("tab1").database, 
"default")
                 
self.assertEqual(spark.catalog.getTable("default.tab1").catalog, 
"spark_catalog")
                 
self.assertEqual(spark.catalog.getTable("spark_catalog.default.tab1").name, 
"tab1")
@@ -381,8 +391,8 @@ class CatalogTests(ReusedSQLTestCase):
             with self.table("my_tab"):
                 spark.sql(
                     "CREATE TABLE my_tab (col STRING) USING TEXT LOCATION 
'{}'".format(tmp_dir)
-                )
-                spark.sql("INSERT INTO my_tab SELECT 'abc'")
+                ).collect()
+                spark.sql("INSERT INTO my_tab SELECT 'abc'").collect()
                 spark.catalog.cacheTable("my_tab")
                 self.assertEqual(spark.table("my_tab").count(), 1)
 
@@ -393,6 +403,10 @@ class CatalogTests(ReusedSQLTestCase):
                 self.assertEqual(spark.table("my_tab").count(), 0)
 
 
+class CatalogTests(ReusedSQLTestCase):
+    pass
+
+
 if __name__ == "__main__":
     import unittest
     from pyspark.sql.tests.test_catalog import *  # noqa: F401
diff --git a/python/pyspark/testing/sqlutils.py 
b/python/pyspark/testing/sqlutils.py
index 92fcd08091c..b5ad7b64f91 100644
--- a/python/pyspark/testing/sqlutils.py
+++ b/python/pyspark/testing/sqlutils.py
@@ -202,7 +202,7 @@ class SQLTestUtils:
             yield
         finally:
             for db in databases:
-                self.spark.sql("DROP DATABASE IF EXISTS %s CASCADE" % db)
+                self.spark.sql("DROP DATABASE IF EXISTS %s CASCADE" % 
db).collect()
             self.spark.catalog.setCurrentDatabase("default")
 
     @contextmanager
@@ -217,7 +217,7 @@ class SQLTestUtils:
             yield
         finally:
             for t in tables:
-                self.spark.sql("DROP TABLE IF EXISTS %s" % t)
+                self.spark.sql("DROP TABLE IF EXISTS %s" % t).collect()
 
     @contextmanager
     def tempView(self, *views):
@@ -245,7 +245,7 @@ class SQLTestUtils:
             yield
         finally:
             for f in functions:
-                self.spark.sql("DROP FUNCTION IF EXISTS %s" % f)
+                self.spark.sql("DROP FUNCTION IF EXISTS %s" % f).collect()
 
     @staticmethod
     def assert_close(a, b):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to