This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4074293ac66 [SPARK-41629][CONNECT] Support for Protocol Extensions in 
Relation and Expression
4074293ac66 is described below

commit 4074293ac663c6df314e55e829362cc18eff4518
Author: Martin Grund <[email protected]>
AuthorDate: Fri Dec 30 17:11:18 2022 +0800

    [SPARK-41629][CONNECT] Support for Protocol Extensions in Relation and 
Expression
    
    ### What changes were proposed in this pull request?
    
    This PR adds an extension mechanism to the Spark Connect protocol to 
support custom Relation and Expression types. This is necessary to support 
custom extensions in Spark like Delta or custom plugins in Catalyst.
    
    This is achieved by adding `protobuf.Any` fields in `Relation`, `Command`, 
and `Expression`. To load the extension, this PR adds two configuration flags 
to indicate which classes should be loaded.
    
      * `spark.connect.extensions.relation.classes`
      * `spark.connect.extensions.expression.classes`
      * `spark.connect.extensions.command.classes`
    
    To add a new plugin, the consumers have to implement either 
`RelationPlugin`, `CommandPlugin`,  or `ExpressionPlugin` and implement the 
corresponding `transform()` method. If the plugin does not support the 
transformation of the input, they plugin must return `None`.
    
    Below is a simplified example of an expression and relation plugin.
    
    First, define the custom message type that are necessary for the particular 
input.
    
    ```proto
    message ExamplePluginRelation {
      Relation input = 1;
      string custom_field = 2;
    
    }
    
    message ExamplePluginExpression {
      Expression child = 1;
      string custom_field = 2;
    }
    ```
    
    Second, define the necessary `RelationPlugin` and `ExpressionPlugin` 
implementations.
    
    ```scala
    class ExampleRelationPlugin extends RelationPlugin {
      override def transform(
          relation: protobuf.Any,
          planner: SparkConnectPlanner): Option[LogicalPlan] = {
    
        if (!relation.is(classOf[proto.ExamplePluginRelation])) {
          return None
        }
        val plugin = relation.unpack(classOf[proto.ExamplePluginRelation])
        Some(planner.transformRelation(plugin.getInput))
      }
    }
    
    class ExampleExpressionPlugin extends ExpressionPlugin {
      override def transform(
          relation: protobuf.Any,
          planner: SparkConnectPlanner): Option[Expression] = {
        if (!relation.is(classOf[proto.ExamplePluginExpression])) {
          return None
        }
        val exp = relation.unpack(classOf[proto.ExamplePluginExpression])
        Some(
          Alias(planner.transformExpression(exp.getChild), 
exp.getCustomField)(explicitMetadata =
            None))
      }
    }
    ```
    
    Now, on the client side, the new extensions simply have to be encoded into 
the `protobuf.Any` value to be available once the the plugins are loaded. Below 
is an example for wrapping the custom message type into a standard `Relation` 
with a `Range` child.
    
    ```scala
    Relation
      .newBuilder()
      .setExtension(
        protobuf.Any.pack(
          proto.ExamplePluginRelation
            .newBuilder()
            .setInput(
              proto.Relation
                .newBuilder()
                .setRange(proto.Range
                  .newBuilder()
                  .setStart(0)
                  .setEnd(10)
                  .setStep(1)))
            .build()))
    ```
    
    When the plan is transformed the custom extensions will behave like any 
other built-in functionality of Spark.
    
    For an end-to-end example the code in `SparkConnectPluginRegistrySuite` 
provides a full working example.
    
    ### Why are the changes needed?
    Extensibility
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, adds a new extension mechanism to the Spark Connect protoocol.
    
    ### How was this patch tested?
    Added test coverage for the plugin registry, and end to end transformation 
and execution.
    
    Closes #39291 from grundprinzip/SPARK-41629.
    
    Authored-by: Martin Grund <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 .../src/main/protobuf/spark/connect/commands.proto |   6 +
 .../protobuf/spark/connect/example_plugins.proto   |  41 ++++
 .../main/protobuf/spark/connect/expressions.proto  |   5 +
 .../main/protobuf/spark/connect/relations.proto    |   4 +
 .../apache/spark/sql/connect/config/Connect.scala  |  36 +++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  60 ++++-
 .../spark/sql/connect/plugin/CommandPlugin.scala   |  34 +++
 .../sql/connect/plugin/ExpressionPlugin.scala      |  35 +++
 .../spark/sql/connect/plugin/RelationPlugin.scala  |  35 +++
 .../plugin/SparkConnectPluginRegistry.scala        | 179 ++++++++++++++
 .../plugin/SparkConnectPluginRegistrySuite.scala   | 259 +++++++++++++++++++++
 core/src/main/resources/error/error-classes.json   |  10 +
 python/pyspark/sql/connect/proto/commands_pb2.py   |  35 +--
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  14 +-
 .../sql/connect/proto/example_plugins_pb2.py       |  87 +++++++
 .../sql/connect/proto/example_plugins_pb2.pyi      | 112 +++++++++
 .../pyspark/sql/connect/proto/expressions_pb2.py   |  87 +++----
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  13 ++
 python/pyspark/sql/connect/proto/relations_pb2.py  | 187 +++++++--------
 python/pyspark/sql/connect/proto/relations_pb2.pyi |  13 ++
 20 files changed, 1096 insertions(+), 156 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
index 650f4fb7fa1..ffacfc008a0 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/commands.proto
@@ -17,6 +17,7 @@
 
 syntax = 'proto3';
 
+import "google/protobuf/any.proto";
 import "spark/connect/relations.proto";
 import "spark/connect/types.proto";
 
@@ -32,6 +33,11 @@ message Command {
     CreateScalarFunction create_function = 1;
     WriteOperation write_operation = 2;
     CreateDataFrameViewCommand create_dataframe_view = 3;
+
+    // This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
+    // Commands they can add them here. During the planning the correct 
resolution is done.
+    google.protobuf.Any extension = 999;
+
   }
 }
 
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/example_plugins.proto
 
b/connector/connect/common/src/main/protobuf/spark/connect/example_plugins.proto
new file mode 100644
index 00000000000..03208c7a439
--- /dev/null
+++ 
b/connector/connect/common/src/main/protobuf/spark/connect/example_plugins.proto
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = 'proto3';
+
+import "spark/connect/relations.proto";
+import "spark/connect/expressions.proto";
+
+package spark.connect;
+
+option java_multiple_files = true;
+option java_package = "org.apache.spark.connect.proto";
+
+message ExamplePluginRelation {
+  Relation input = 1;
+  string custom_field = 2;
+
+}
+
+message ExamplePluginExpression {
+  Expression child = 1;
+  string custom_field = 2;
+}
+
+message ExamplePluginCommand {
+  string custom_field = 1;
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
index fa2836702c6..3b1695041a7 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto
@@ -17,6 +17,7 @@
 
 syntax = 'proto3';
 
+import "google/protobuf/any.proto";
 import "spark/connect/types.proto";
 
 package spark.connect;
@@ -42,6 +43,10 @@ message Expression {
     Window window = 11;
     UnresolvedExtractValue unresolved_extract_value = 12;
     UpdateFields update_fields = 13;
+
+    // This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
+    // relations they can add them here. During the planning the correct 
resolution is done.
+    google.protobuf.Any extension = 999;
   }
 
 
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 8a604f0702c..afff04f8f0d 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -19,6 +19,7 @@ syntax = 'proto3';
 
 package spark.connect;
 
+import "google/protobuf/any.proto";
 import "spark/connect/expressions.proto";
 import "spark/connect/types.proto";
 import "spark/connect/catalog.proto";
@@ -75,6 +76,9 @@ message Relation {
     // Catalog API (experimental / unstable)
     Catalog catalog = 200;
 
+    // This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
+    // relations they can add them here. During the planning the correct 
resolution is done.
+    google.protobuf.Any extension = 998;
     Unknown unknown = 999;
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 60fdd964018..3f255378b97 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -45,4 +45,40 @@ private[spark] object Connect {
       .version("3.4.0")
       .bytesConf(ByteUnit.MiB)
       .createWithDefaultString("4m")
+
+  val CONNECT_EXTENSIONS_RELATION_CLASSES =
+    ConfigBuilder("spark.connect.extensions.relation.classes")
+      .doc("""
+          |Comma separated list of classes that implement the trait
+          |org.apache.spark.sql.connect.plugin.RelationPlugin to support custom
+          |Relation types in proto.
+          |""".stripMargin)
+      .version("3.4.0")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
+  val CONNECT_EXTENSIONS_EXPRESSION_CLASSES =
+    ConfigBuilder("spark.connect.extensions.expression.classes")
+      .doc("""
+          |Comma separated list of classes that implement the trait
+          |org.apache.spark.sql.connect.plugin.ExpressionPlugin to support 
custom
+          |Expression types in proto.
+          |""".stripMargin)
+      .version("3.4.0")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
+
+  val CONNECT_EXTENSIONS_COMMAND_CLASSES =
+    ConfigBuilder("spark.connect.extensions.command.classes")
+      .doc("""
+             |Comma separated list of classes that implement the trait
+             |org.apache.spark.sql.connect.plugin.CommandPlugin to support 
custom
+             |Command types in proto.
+             |""".stripMargin)
+      .version("3.4.0")
+      .stringConf
+      .toSequence
+      .createWithDefault(Nil)
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index bb582e92755..7d6fdc2883e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import com.google.common.collect.{Lists, Maps}
+import com.google.protobuf.{Any => ProtoAny}
 
 import org.apache.spark.TaskContext
 import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
@@ -31,10 +32,12 @@ import 
org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Mu
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
ParseException, ParserUtils}
-import org.apache.spark.sql.catalyst.plans.{logical, Cross, FullOuter, Inner, 
JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
+import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, 
LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
+import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, 
Intersect, LocalRelation, LogicalPlan, Sample, Sort, SubqueryAlias, Union, 
Unpivot, UnresolvedHint}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import 
org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.{toCatalystExpression,
 toCatalystValue}
+import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.arrow.ArrowConverters
@@ -108,10 +111,25 @@ class SparkConnectPlanner(session: SparkSession) {
       // Catalog API (internal-only)
       case proto.Relation.RelTypeCase.CATALOG => 
transformCatalog(rel.getCatalog)
 
+      // Handle plugins for Spark Connect Relation types.
+      case proto.Relation.RelTypeCase.EXTENSION =>
+        transformRelationPlugin(rel.getExtension)
       case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.")
     }
   }
 
+  def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
+    SparkConnectPluginRegistry.relationRegistry
+      // Lazily traverse the collection.
+      .view
+      // Apply the transformation.
+      .map(p => p.transform(extension, this))
+      // Find the first non-empty transformation or throw.
+      .find(_.nonEmpty)
+      .flatten
+      .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+  }
+
   private def transformCatalog(catalog: proto.Catalog): LogicalPlan = {
     catalog.getCatTypeCase match {
       case proto.Catalog.CatTypeCase.CURRENT_DATABASE =>
@@ -592,7 +610,17 @@ class SparkConnectPlanner(session: SparkSession) {
     
UnresolvedAttribute.quotedString(exp.getUnresolvedAttribute.getUnparsedIdentifier)
   }
 
-  private def transformExpression(exp: proto.Expression): Expression = {
+  /**
+   * Transforms an input protobuf expression into the Catalyst expression. 
This is usually not
+   * called directly. Typically the planner will traverse the expressions 
automatically, only
+   * plugins are expected to manually perform expression transformations.
+   *
+   * @param exp
+   *   the input expression
+   * @return
+   *   Catalyst expression
+   */
+  def transformExpression(exp: proto.Expression): Expression = {
     exp.getExprTypeCase match {
       case proto.Expression.ExprTypeCase.LITERAL => 
transformLiteral(exp.getLiteral)
       case proto.Expression.ExprTypeCase.UNRESOLVED_ATTRIBUTE =>
@@ -617,12 +645,26 @@ class SparkConnectPlanner(session: SparkSession) {
         transformLambdaFunction(exp.getLambdaFunction)
       case proto.Expression.ExprTypeCase.WINDOW =>
         transformWindowExpression(exp.getWindow)
+      case proto.Expression.ExprTypeCase.EXTENSION =>
+        transformExpressionPlugin(exp.getExtension)
       case _ =>
         throw InvalidPlanInput(
           s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not 
supported")
     }
   }
 
+  def transformExpressionPlugin(extension: ProtoAny): Expression = {
+    SparkConnectPluginRegistry.expressionRegistry
+      // Lazily traverse the collection.
+      .view
+      // Apply the transformation.
+      .map(p => p.transform(extension, this))
+      // Find the first non-empty transformation or throw.
+      .find(_.nonEmpty)
+      .flatten
+      .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+  }
+
   /**
    * Transforms the protocol buffers literals into the appropriate Catalyst 
literal expression.
    * @return
@@ -1138,10 +1180,24 @@ class SparkConnectPlanner(session: SparkSession) {
         handleWriteOperation(command.getWriteOperation)
       case proto.Command.CommandTypeCase.CREATE_DATAFRAME_VIEW =>
         handleCreateViewCommand(command.getCreateDataframeView)
+      case proto.Command.CommandTypeCase.EXTENSION =>
+        handleCommandPlugin(command.getExtension)
       case _ => throw new UnsupportedOperationException(s"$command not 
supported.")
     }
   }
 
+  private def handleCommandPlugin(extension: ProtoAny): Unit = {
+    SparkConnectPluginRegistry.commandRegistry
+      // Lazily traverse the collection.
+      .view
+      // Apply the transformation.
+      .map(p => p.process(extension, this))
+      // Find the first non-empty transformation or throw.
+      .find(_.nonEmpty)
+      .flatten
+      .getOrElse(throw InvalidPlanInput("No handler found for extension"))
+  }
+
   /**
    * This is a helper function that registers a new Python function in the 
SparkSession.
    *
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/CommandPlugin.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/CommandPlugin.scala
new file mode 100644
index 00000000000..839a774062f
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/CommandPlugin.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.plugin
+
+import com.google.protobuf
+
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+
+/**
+ * Behavior trait for supporting extension mechanisms for the Spark Connect 
planner.
+ *
+ * Classes implementing the trait must be trivially constructable and should 
not rely on internal
+ * state. Every registered extension will be passed the Any instance. If the 
plugin supports
+ * handling this type it is responsible of constructing the logical expression 
from this object
+ * and if necessary traverse it's children.
+ */
+trait CommandPlugin {
+  def process(command: protobuf.Any, planner: SparkConnectPlanner): 
Option[Unit]
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/ExpressionPlugin.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/ExpressionPlugin.scala
new file mode 100644
index 00000000000..78473122656
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/ExpressionPlugin.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.plugin
+
+import com.google.protobuf
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+
+/**
+ * Behavior trait for supporting extension mechanisms for the Spark Connect 
planner.
+ *
+ * Classes implementing the trait must be trivially constructable and should 
not rely on internal
+ * state. Every registered extension will be passed the Any instance. If the 
plugin supports
+ * handling this type it is responsible of constructing the logical expression 
from this object
+ * and if necessary traverse it's children.
+ */
+trait ExpressionPlugin {
+  def transform(relation: protobuf.Any, planner: SparkConnectPlanner): 
Option[Expression]
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/RelationPlugin.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/RelationPlugin.scala
new file mode 100644
index 00000000000..b583c6456d2
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/RelationPlugin.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.plugin
+
+import com.google.protobuf
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+
+/**
+ * Behavior trait for supporting extension mechanisms for the Spark Connect 
planner.
+ *
+ * Classes implementing the trait must be trivially constructable and should 
not rely on internal
+ * state. Every registered extension will be passed the Any instance. If the 
plugin supports
+ * handling this type it is responsible of constructing the logical catalyst 
plan from this object
+ * and if necessary traverse it's children.
+ */
+trait RelationPlugin {
+  def transform(relation: protobuf.Any, planner: SparkConnectPlanner): 
Option[LogicalPlan]
+}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistry.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistry.scala
new file mode 100644
index 00000000000..1169d7023e6
--- /dev/null
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistry.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.plugin
+
+import java.lang.reflect.InvocationTargetException
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured relation and expression 
plugins for Spark
+ * Connect. The plugins are used to handle custom message types.
+ */
+object SparkConnectPluginRegistry {
+
+  // Contains the list of configured interceptors.
+  private lazy val relationPluginChain: Seq[relationPluginBuilder] = Seq(
+    // Adding a new plugin at compile time works like the example below:
+    // relation[DummyRelationPlugin](classOf[DummyRelationPlugin])
+  )
+
+  private lazy val expressionPluginChain: Seq[expressionPluginBuilder] = Seq(
+    // Adding a new plugin at compile time works like the example below:
+    // expression[DummyExpressionPlugin](classOf[DummyExpressionPlugin])
+  )
+
+  private lazy val commandPluginChain: Seq[commandPluginBuilder] = Seq(
+    // Adding a new plugin at compile time works like the example below:
+    // expression[DummyExpressionPlugin](classOf[DummyExpressionPlugin])
+  )
+
+  private var initialized = false
+  private var relationRegistryCache: Seq[RelationPlugin] = Seq.empty
+  private var expressionRegistryCache: Seq[ExpressionPlugin] = Seq.empty
+  private var commandRegistryCache: Seq[CommandPlugin] = Seq.empty
+
+  // Type used to identify the closure responsible to instantiate a 
ServerInterceptor.
+  type relationPluginBuilder = () => RelationPlugin
+  type expressionPluginBuilder = () => ExpressionPlugin
+  type commandPluginBuilder = () => CommandPlugin
+
+  def relationRegistry: Seq[RelationPlugin] = withInitialize {
+    relationRegistryCache
+  }
+  def expressionRegistry: Seq[ExpressionPlugin] = withInitialize {
+    expressionRegistryCache
+  }
+  def commandRegistry: Seq[CommandPlugin] = withInitialize {
+    commandRegistryCache
+  }
+
+  private def withInitialize[T](f: => Seq[T]): Seq[T] = {
+    synchronized {
+      if (!initialized) {
+        relationRegistryCache = loadRelationPlugins()
+        expressionRegistryCache = loadExpressionPlugins()
+        commandRegistryCache = loadCommandPlugins()
+        initialized = true
+      }
+    }
+    f
+  }
+
+  /**
+   * Only visible for testing. Should not be called from any other code path.
+   */
+  def reset(): Unit = {
+    synchronized {
+      initialized = false
+    }
+  }
+
+  /**
+   * Only visible for testing
+   */
+  private[connect] def loadRelationPlugins(): Seq[RelationPlugin] = {
+    relationPluginChain.map(x => x()) ++ 
createConfiguredPlugins[RelationPlugin](
+      SparkEnv.get.conf.get(Connect.CONNECT_EXTENSIONS_RELATION_CLASSES))
+  }
+
+  /**
+   * Only visible for testing
+   */
+  private[connect] def loadExpressionPlugins(): Seq[ExpressionPlugin] = {
+    expressionPluginChain.map(x => x()) ++ createConfiguredPlugins(
+      SparkEnv.get.conf.get(Connect.CONNECT_EXTENSIONS_EXPRESSION_CLASSES))
+  }
+
+  private[connect] def loadCommandPlugins(): Seq[CommandPlugin] = {
+    commandPluginChain.map(x => x()) ++ createConfiguredPlugins(
+      SparkEnv.get.conf.get(Connect.CONNECT_EXTENSIONS_COMMAND_CLASSES))
+  }
+
+  /**
+   * Exposed for testing only.
+   */
+  def createConfiguredPlugins[T](values: Seq[String]): Seq[T] = {
+    // Check all values from the Spark conf.
+    if (values.nonEmpty) {
+      values
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(Utils.classForName[T](_))
+        .map(createInstance(_))
+    } else {
+      Seq.empty
+    }
+  }
+
+  /**
+   * Creates a new instance of T using the default constructor.
+   * @param cls
+   * @tparam T
+   * @return
+   */
+  private def createInstance[B, T <: B](cls: Class[T]): B = {
+    val ctorOpt = cls.getConstructors.find(_.getParameterCount == 0)
+    if (ctorOpt.isEmpty) {
+      throw new SparkException(
+        errorClass = "CONNECT.PLUGIN_CTOR_MISSING",
+        messageParameters = Map("cls" -> cls.getName),
+        cause = null)
+    }
+    try {
+      ctorOpt.get.newInstance().asInstanceOf[T]
+    } catch {
+      case e: InvocationTargetException =>
+        throw new SparkException(
+          errorClass = "CONNECT.PLUGIN_RUNTIME_ERROR",
+          messageParameters = Map("msg" -> e.getTargetException.getMessage),
+          cause = e)
+      case e: Exception =>
+        throw new SparkException(
+          errorClass = "CONNECT.PLUGIN_RUNTIME_ERROR",
+          messageParameters = Map("msg" -> e.getMessage),
+          cause = e)
+    }
+  }
+
+  /**
+   * Creates a callable expression that instantiates the configured Relation 
plugin.
+   *
+   * Visible for testing only.
+   */
+  def relation[T <: RelationPlugin](cls: Class[T]): relationPluginBuilder =
+    () => createInstance[RelationPlugin, T](cls)
+
+  /**
+   * Creates a callable expression that instantiates the configured Expression 
plugin.
+   *
+   * Visible for testing only.
+   */
+  def expression[T <: ExpressionPlugin](cls: Class[T]): 
expressionPluginBuilder =
+    () => createInstance[ExpressionPlugin, T](cls)
+
+  /**
+   * Creates a callable expression that instantiates the configured Command 
plugin.
+   *
+   * Visible for testing only.
+   */
+  def command[T <: CommandPlugin](cls: Class[T]): commandPluginBuilder =
+    () => createInstance[CommandPlugin, T](cls)
+}
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
new file mode 100644
index 00000000000..460fd32098c
--- /dev/null
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.plugin
+
+import com.google.protobuf
+
+import org.apache.spark.{SparkContext, SparkEnv, SparkException}
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.Relation
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.planner.{InvalidPlanInput, 
SparkConnectPlanner, SparkConnectPlanTest}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class DummyPlugin extends RelationPlugin {
+  override def transform(
+      relation: protobuf.Any,
+      planner: SparkConnectPlanner): Option[LogicalPlan] = None
+}
+
+class DummyExpressionPlugin extends ExpressionPlugin {
+  override def transform(
+      relation: protobuf.Any,
+      planner: SparkConnectPlanner): Option[Expression] = None
+}
+
+class DummyPluginNoTrivialCtor(id: Int) extends RelationPlugin {
+  override def transform(
+      relation: protobuf.Any,
+      planner: SparkConnectPlanner): Option[LogicalPlan] = None
+}
+
+class DummyPluginInstantiationError extends RelationPlugin {
+
+  throw new ArrayIndexOutOfBoundsException("Bad Plugin Error")
+
+  override def transform(
+      relation: protobuf.Any,
+      planner: SparkConnectPlanner): Option[LogicalPlan] = None
+}
+
+class ExampleRelationPlugin extends RelationPlugin {
+  override def transform(
+      relation: protobuf.Any,
+      planner: SparkConnectPlanner): Option[LogicalPlan] = {
+
+    if (!relation.is(classOf[proto.ExamplePluginRelation])) {
+      return None
+    }
+    val plugin = relation.unpack(classOf[proto.ExamplePluginRelation])
+    Some(planner.transformRelation(plugin.getInput))
+  }
+}
+
+class ExampleExpressionPlugin extends ExpressionPlugin {
+  override def transform(
+      relation: protobuf.Any,
+      planner: SparkConnectPlanner): Option[Expression] = {
+    if (!relation.is(classOf[proto.ExamplePluginExpression])) {
+      return None
+    }
+    val exp = relation.unpack(classOf[proto.ExamplePluginExpression])
+    Some(
+      Alias(planner.transformExpression(exp.getChild), 
exp.getCustomField)(explicitMetadata =
+        None))
+  }
+}
+
+class ExampleCommandPlugin extends CommandPlugin {
+  override def process(command: protobuf.Any, planner: SparkConnectPlanner): 
Option[Unit] = {
+    if (!command.is(classOf[proto.ExamplePluginCommand])) {
+      return None
+    }
+    val cmd = command.unpack(classOf[proto.ExamplePluginCommand])
+    SparkContext.getActive.get.setLocalProperty("testingProperty", 
cmd.getCustomField)
+    Some()
+  }
+}
+
+class SparkConnectPluginRegistrySuite extends SharedSparkSession with 
SparkConnectPlanTest {
+
+  override def beforeEach(): Unit = {
+    if 
(SparkEnv.get.conf.contains(Connect.CONNECT_EXTENSIONS_EXPRESSION_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_EXTENSIONS_EXPRESSION_CLASSES)
+    }
+    if 
(SparkEnv.get.conf.contains(Connect.CONNECT_EXTENSIONS_RELATION_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_EXTENSIONS_RELATION_CLASSES)
+    }
+    if 
(SparkEnv.get.conf.contains(Connect.CONNECT_EXTENSIONS_COMMAND_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_EXTENSIONS_COMMAND_CLASSES)
+    }
+    SparkConnectPluginRegistry.reset()
+  }
+
+  def withSparkConf(pairs: (String, String)*)(f: => Unit): Unit = {
+    val conf = SparkEnv.get.conf
+    pairs.foreach { kv => conf.set(kv._1, kv._2) }
+    try f
+    finally {
+      pairs.foreach { kv => conf.remove(kv._1) }
+    }
+  }
+
+  def buildRelation(): proto.Relation = {
+    val input = Relation
+      .newBuilder()
+      .setExtension(
+        protobuf.Any.pack(
+          proto.ExamplePluginRelation
+            .newBuilder()
+            .setInput(
+              proto.Relation
+                .newBuilder()
+                .setRange(proto.Range
+                  .newBuilder()
+                  .setStart(0)
+                  .setEnd(10)
+                  .setStep(1)))
+            .build()))
+    Relation
+      .newBuilder()
+      .setProject(
+        proto.Project
+          .newBuilder()
+          .addExpressions(
+            proto.Expression
+              .newBuilder()
+              .setExtension(
+                protobuf.Any.pack(
+                  proto.ExamplePluginExpression
+                    .newBuilder()
+                    .setChild(proto.Expression
+                      .newBuilder()
+                      
.setUnresolvedAttribute(proto.Expression.UnresolvedAttribute
+                        .newBuilder()
+                        .setUnparsedIdentifier("id")))
+                    .setCustomField("martin")
+                    .build())))
+          .setInput(input))
+      .build()
+  }
+
+  test("end to end with no extensions configured") {
+    assertThrows[InvalidPlanInput] {
+      transform(buildRelation())
+    }
+
+  }
+
+  test("End to end Relation plugin test") {
+    withSparkConf(
+      Connect.CONNECT_EXTENSIONS_RELATION_CLASSES.key ->
+        "org.apache.spark.sql.connect.plugin.ExampleRelationPlugin",
+      Connect.CONNECT_EXTENSIONS_EXPRESSION_CLASSES.key ->
+        "org.apache.spark.sql.connect.plugin.ExampleExpressionPlugin") {
+      val plan = transform(buildRelation())
+      val ds = Dataset.ofRows(spark, plan)
+      val result = ds.collect()
+      assert(result.length == 10)
+      assert(result(0).schema.fieldNames(0) == "martin")
+    }
+  }
+
+  test("End to end Command test") {
+    withSparkConf(
+      Connect.CONNECT_EXTENSIONS_COMMAND_CLASSES.key ->
+        "org.apache.spark.sql.connect.plugin.ExampleCommandPlugin") {
+      spark.sparkContext.setLocalProperty("testingProperty", "notset")
+      val plan = proto.Command
+        .newBuilder()
+        .setExtension(
+          protobuf.Any.pack(
+            proto.ExamplePluginCommand
+              .newBuilder()
+              .setCustomField("Martin")
+              .build()))
+        .build()
+
+      new SparkConnectPlanner(spark).process(plan)
+      
assert(spark.sparkContext.getLocalProperty("testingProperty").equals("Martin"))
+    }
+  }
+
+  test("Exception handling for plugin classes") {
+    withSparkConf(
+      Connect.CONNECT_EXTENSIONS_RELATION_CLASSES.key ->
+        "org.apache.spark.sql.connect.plugin.DummyPluginNoTrivialCtor") {
+      checkError(
+        exception = intercept[SparkException] {
+          SparkConnectPluginRegistry.loadRelationPlugins()
+        },
+        errorClass = "CONNECT.PLUGIN_CTOR_MISSING",
+        parameters = Map("cls" -> 
"org.apache.spark.sql.connect.plugin.DummyPluginNoTrivialCtor"))
+    }
+
+    withSparkConf(
+      Connect.CONNECT_EXTENSIONS_RELATION_CLASSES.key ->
+        "org.apache.spark.sql.connect.plugin.DummyPluginInstantiationError") {
+      checkError(
+        exception = intercept[SparkException] {
+          SparkConnectPluginRegistry.loadRelationPlugins()
+        },
+        errorClass = "CONNECT.PLUGIN_RUNTIME_ERROR",
+        parameters = Map("msg" -> "Bad Plugin Error"))
+    }
+  }
+
+  test("Emtpy registries are really empty and work") {
+    assert(SparkConnectPluginRegistry.loadRelationPlugins().isEmpty)
+    assert(SparkConnectPluginRegistry.loadExpressionPlugins().isEmpty)
+    assert(SparkConnectPluginRegistry.loadCommandPlugins().isEmpty)
+  }
+
+  test("Building builders using factory methods") {
+    val x = 
SparkConnectPluginRegistry.relation[DummyPlugin](classOf[DummyPlugin])
+    assert(x != null)
+    assert(x().isInstanceOf[RelationPlugin])
+    val y =
+      
SparkConnectPluginRegistry.expression[DummyExpressionPlugin](classOf[DummyExpressionPlugin])
+    assert(y != null)
+    assert(y().isInstanceOf[ExpressionPlugin])
+  }
+
+  test("Configured class not found is properly thrown") {
+    withSparkConf(
+      Connect.CONNECT_EXTENSIONS_EXPRESSION_CLASSES.key -> 
"this.class.does.not.exist") {
+      assertThrows[ClassNotFoundException] {
+        SparkConnectPluginRegistry.createConfiguredPlugins(
+          SparkEnv.get.conf.get(Connect.CONNECT_EXTENSIONS_EXPRESSION_CLASSES))
+      }
+    }
+
+    withSparkConf(
+      Connect.CONNECT_EXTENSIONS_RELATION_CLASSES.key -> 
"this.class.does.not.exist") {
+      assertThrows[ClassNotFoundException] {
+        SparkConnectPluginRegistry.createConfiguredPlugins(
+          SparkEnv.get.conf.get(Connect.CONNECT_EXTENSIONS_RELATION_CLASSES))
+      }
+    }
+  }
+
+}
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 67398a30180..4003fab0685 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -139,6 +139,16 @@
         "message" : [
           "Error instantiating GRPC interceptor: <msg>"
         ]
+      },
+      "PLUGIN_CTOR_MISSING" : {
+        "message" : [
+          "Cannot instantiate Spark Connect plugin because <cls> is missing a 
default constructor without arguments."
+        ]
+      },
+      "PLUGIN_RUNTIME_ERROR" : {
+        "message" : [
+          "Error instantiating Spark Connect plugin: <msg>"
+        ]
       }
     }
   },
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py 
b/python/pyspark/sql/connect/proto/commands_pb2.py
index e7069a4b9a7..bc52ce6e763 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.py
+++ b/python/pyspark/sql/connect/proto/commands_pb2.py
@@ -29,12 +29,13 @@ from google.protobuf import symbol_database as 
_symbol_database
 _sym_db = _symbol_database.Default()
 
 
+from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
 from pyspark.sql.connect.proto import relations_pb2 as 
spark_dot_connect_dot_relations__pb2
 from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x94\x02\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01
 
\x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03
 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateD 
[...]
+    
b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xcb\x02\n\x07\x43ommand\x12N\n\x0f\x63reate_function\x18\x01
 
\x01(\x0b\x32#.spark.connect.CreateScalarFunctionH\x00R\x0e\x63reateFunction\x12H\n\x0fwrite_operation\x18\x02
 
\x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03
 \x01(\x0b\x32).spark.connect.CreateDataFra [...]
 )
 
 
@@ -118,20 +119,20 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     DESCRIPTOR._serialized_options = 
b"\n\036org.apache.spark.connect.protoP\001"
     _WRITEOPERATION_OPTIONSENTRY._options = None
     _WRITEOPERATION_OPTIONSENTRY._serialized_options = b"8\001"
-    _COMMAND._serialized_start = 106
-    _COMMAND._serialized_end = 382
-    _CREATESCALARFUNCTION._serialized_start = 385
-    _CREATESCALARFUNCTION._serialized_end = 920
-    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 758
-    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 897
-    _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 923
-    _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 1073
-    _WRITEOPERATION._serialized_start = 1076
-    _WRITEOPERATION._serialized_end = 1818
-    _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1514
-    _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1572
-    _WRITEOPERATION_BUCKETBY._serialized_start = 1574
-    _WRITEOPERATION_BUCKETBY._serialized_end = 1665
-    _WRITEOPERATION_SAVEMODE._serialized_start = 1668
-    _WRITEOPERATION_SAVEMODE._serialized_end = 1805
+    _COMMAND._serialized_start = 133
+    _COMMAND._serialized_end = 464
+    _CREATESCALARFUNCTION._serialized_start = 467
+    _CREATESCALARFUNCTION._serialized_end = 1002
+    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_start = 840
+    _CREATESCALARFUNCTION_FUNCTIONLANGUAGE._serialized_end = 979
+    _CREATEDATAFRAMEVIEWCOMMAND._serialized_start = 1005
+    _CREATEDATAFRAMEVIEWCOMMAND._serialized_end = 1155
+    _WRITEOPERATION._serialized_start = 1158
+    _WRITEOPERATION._serialized_end = 1900
+    _WRITEOPERATION_OPTIONSENTRY._serialized_start = 1596
+    _WRITEOPERATION_OPTIONSENTRY._serialized_end = 1654
+    _WRITEOPERATION_BUCKETBY._serialized_start = 1656
+    _WRITEOPERATION_BUCKETBY._serialized_end = 1747
+    _WRITEOPERATION_SAVEMODE._serialized_start = 1750
+    _WRITEOPERATION_SAVEMODE._serialized_end = 1887
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi 
b/python/pyspark/sql/connect/proto/commands_pb2.pyi
index c3c7d31222b..2cebbf47c23 100644
--- a/python/pyspark/sql/connect/proto/commands_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi
@@ -35,6 +35,7 @@ limitations under the License.
 """
 import builtins
 import collections.abc
+import google.protobuf.any_pb2
 import google.protobuf.descriptor
 import google.protobuf.internal.containers
 import google.protobuf.internal.enum_type_wrapper
@@ -61,18 +62,25 @@ class Command(google.protobuf.message.Message):
     CREATE_FUNCTION_FIELD_NUMBER: builtins.int
     WRITE_OPERATION_FIELD_NUMBER: builtins.int
     CREATE_DATAFRAME_VIEW_FIELD_NUMBER: builtins.int
+    EXTENSION_FIELD_NUMBER: builtins.int
     @property
     def create_function(self) -> global___CreateScalarFunction: ...
     @property
     def write_operation(self) -> global___WriteOperation: ...
     @property
     def create_dataframe_view(self) -> global___CreateDataFrameViewCommand: ...
+    @property
+    def extension(self) -> google.protobuf.any_pb2.Any:
+        """This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
+        Commands they can add them here. During the planning the correct 
resolution is done.
+        """
     def __init__(
         self,
         *,
         create_function: global___CreateScalarFunction | None = ...,
         write_operation: global___WriteOperation | None = ...,
         create_dataframe_view: global___CreateDataFrameViewCommand | None = 
...,
+        extension: google.protobuf.any_pb2.Any | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -83,6 +91,8 @@ class Command(google.protobuf.message.Message):
             b"create_dataframe_view",
             "create_function",
             b"create_function",
+            "extension",
+            b"extension",
             "write_operation",
             b"write_operation",
         ],
@@ -96,6 +106,8 @@ class Command(google.protobuf.message.Message):
             b"create_dataframe_view",
             "create_function",
             b"create_function",
+            "extension",
+            b"extension",
             "write_operation",
             b"write_operation",
         ],
@@ -103,7 +115,7 @@ class Command(google.protobuf.message.Message):
     def WhichOneof(
         self, oneof_group: typing_extensions.Literal["command_type", 
b"command_type"]
     ) -> typing_extensions.Literal[
-        "create_function", "write_operation", "create_dataframe_view"
+        "create_function", "write_operation", "create_dataframe_view", 
"extension"
     ] | None: ...
 
 global___Command = Command
diff --git a/python/pyspark/sql/connect/proto/example_plugins_pb2.py 
b/python/pyspark/sql/connect/proto/example_plugins_pb2.py
new file mode 100644
index 00000000000..4223fc91a69
--- /dev/null
+++ b/python/pyspark/sql/connect/proto/example_plugins_pb2.py
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: spark/connect/example_plugins.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from pyspark.sql.connect.proto import relations_pb2 as 
spark_dot_connect_dot_relations__pb2
+from pyspark.sql.connect.proto import expressions_pb2 as 
spark_dot_connect_dot_expressions__pb2
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
+    
b'\n#spark/connect/example_plugins.proto\x12\rspark.connect\x1a\x1dspark/connect/relations.proto\x1a\x1fspark/connect/expressions.proto"i\n\x15\x45xamplePluginRelation\x12-\n\x05input\x18\x01
 
\x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12!\n\x0c\x63ustom_field\x18\x02
 
\x01(\tR\x0b\x63ustomField"m\n\x17\x45xamplePluginExpression\x12/\n\x05\x63hild\x18\x01
 
\x01(\x0b\x32\x19.spark.connect.ExpressionR\x05\x63hild\x12!\n\x0c\x63ustom_field\x18\x02
 \x01(\tR\x0b\x63ustomField"9\n\x1 [...]
+)
+
+
+_EXAMPLEPLUGINRELATION = 
DESCRIPTOR.message_types_by_name["ExamplePluginRelation"]
+_EXAMPLEPLUGINEXPRESSION = 
DESCRIPTOR.message_types_by_name["ExamplePluginExpression"]
+_EXAMPLEPLUGINCOMMAND = 
DESCRIPTOR.message_types_by_name["ExamplePluginCommand"]
+ExamplePluginRelation = _reflection.GeneratedProtocolMessageType(
+    "ExamplePluginRelation",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _EXAMPLEPLUGINRELATION,
+        "__module__": "spark.connect.example_plugins_pb2"
+        # 
@@protoc_insertion_point(class_scope:spark.connect.ExamplePluginRelation)
+    },
+)
+_sym_db.RegisterMessage(ExamplePluginRelation)
+
+ExamplePluginExpression = _reflection.GeneratedProtocolMessageType(
+    "ExamplePluginExpression",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _EXAMPLEPLUGINEXPRESSION,
+        "__module__": "spark.connect.example_plugins_pb2"
+        # 
@@protoc_insertion_point(class_scope:spark.connect.ExamplePluginExpression)
+    },
+)
+_sym_db.RegisterMessage(ExamplePluginExpression)
+
+ExamplePluginCommand = _reflection.GeneratedProtocolMessageType(
+    "ExamplePluginCommand",
+    (_message.Message,),
+    {
+        "DESCRIPTOR": _EXAMPLEPLUGINCOMMAND,
+        "__module__": "spark.connect.example_plugins_pb2"
+        # 
@@protoc_insertion_point(class_scope:spark.connect.ExamplePluginCommand)
+    },
+)
+_sym_db.RegisterMessage(ExamplePluginCommand)
+
+if _descriptor._USE_C_DESCRIPTORS == False:
+
+    DESCRIPTOR._options = None
+    DESCRIPTOR._serialized_options = 
b"\n\036org.apache.spark.connect.protoP\001"
+    _EXAMPLEPLUGINRELATION._serialized_start = 118
+    _EXAMPLEPLUGINRELATION._serialized_end = 223
+    _EXAMPLEPLUGINEXPRESSION._serialized_start = 225
+    _EXAMPLEPLUGINEXPRESSION._serialized_end = 334
+    _EXAMPLEPLUGINCOMMAND._serialized_start = 336
+    _EXAMPLEPLUGINCOMMAND._serialized_end = 393
+# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/example_plugins_pb2.pyi 
b/python/pyspark/sql/connect/proto/example_plugins_pb2.pyi
new file mode 100644
index 00000000000..1be966ff1e4
--- /dev/null
+++ b/python/pyspark/sql/connect/proto/example_plugins_pb2.pyi
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+@generated by mypy-protobuf.  Do not edit manually!
+isort:skip_file
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import builtins
+import google.protobuf.descriptor
+import google.protobuf.message
+import pyspark.sql.connect.proto.expressions_pb2
+import pyspark.sql.connect.proto.relations_pb2
+import sys
+
+if sys.version_info >= (3, 8):
+    import typing as typing_extensions
+else:
+    import typing_extensions
+
+DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
+
+class ExamplePluginRelation(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    INPUT_FIELD_NUMBER: builtins.int
+    CUSTOM_FIELD_FIELD_NUMBER: builtins.int
+    @property
+    def input(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: ...
+    custom_field: builtins.str
+    def __init__(
+        self,
+        *,
+        input: pyspark.sql.connect.proto.relations_pb2.Relation | None = ...,
+        custom_field: builtins.str = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["input", b"input"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal["custom_field", b"custom_field", 
"input", b"input"],
+    ) -> None: ...
+
+global___ExamplePluginRelation = ExamplePluginRelation
+
+class ExamplePluginExpression(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    CHILD_FIELD_NUMBER: builtins.int
+    CUSTOM_FIELD_FIELD_NUMBER: builtins.int
+    @property
+    def child(self) -> pyspark.sql.connect.proto.expressions_pb2.Expression: 
...
+    custom_field: builtins.str
+    def __init__(
+        self,
+        *,
+        child: pyspark.sql.connect.proto.expressions_pb2.Expression | None = 
...,
+        custom_field: builtins.str = ...,
+    ) -> None: ...
+    def HasField(
+        self, field_name: typing_extensions.Literal["child", b"child"]
+    ) -> builtins.bool: ...
+    def ClearField(
+        self,
+        field_name: typing_extensions.Literal["child", b"child", 
"custom_field", b"custom_field"],
+    ) -> None: ...
+
+global___ExamplePluginExpression = ExamplePluginExpression
+
+class ExamplePluginCommand(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    CUSTOM_FIELD_FIELD_NUMBER: builtins.int
+    custom_field: builtins.str
+    def __init__(
+        self,
+        *,
+        custom_field: builtins.str = ...,
+    ) -> None: ...
+    def ClearField(
+        self, field_name: typing_extensions.Literal["custom_field", 
b"custom_field"]
+    ) -> None: ...
+
+global___ExamplePluginCommand = ExamplePluginCommand
diff --git a/python/pyspark/sql/connect/proto/expressions_pb2.py 
b/python/pyspark/sql/connect/proto/expressions_pb2.py
index 01c24d1bcd9..c59eb9e2577 100644
--- a/python/pyspark/sql/connect/proto/expressions_pb2.py
+++ b/python/pyspark/sql/connect/proto/expressions_pb2.py
@@ -29,11 +29,12 @@ from google.protobuf import symbol_database as 
_symbol_database
 _sym_db = _symbol_database.Default()
 
 
+from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
 from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1fspark/connect/expressions.proto\x12\rspark.connect\x1a\x19spark/connect/types.proto"\xb2!\n\nExpression\x12=\n\x07literal\x18\x01
 
\x01(\x0b\x32!.spark.connect.Expression.LiteralH\x00R\x07literal\x12\x62\n\x14unresolved_attribute\x18\x02
 
\x01(\x0b\x32-.spark.connect.Expression.UnresolvedAttributeH\x00R\x13unresolvedAttribute\x12_\n\x13unresolved_function\x18\x03
 
\x01(\x0b\x32,.spark.connect.Expression.UnresolvedFunctionH\x00R\x12unresolvedFunction\x12Y\n\x11\x65xpression_strin
 [...]
+    
b'\n\x1fspark/connect/expressions.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x19spark/connect/types.proto"\xe9!\n\nExpression\x12=\n\x07literal\x18\x01
 
\x01(\x0b\x32!.spark.connect.Expression.LiteralH\x00R\x07literal\x12\x62\n\x14unresolved_attribute\x18\x02
 
\x01(\x0b\x32-.spark.connect.Expression.UnresolvedAttributeH\x00R\x13unresolvedAttribute\x12_\n\x13unresolved_function\x18\x03
 
\x01(\x0b\x32,.spark.connect.Expression.UnresolvedFunctionH\x00R\x12unresolvedFunct
 [...]
 )
 
 
@@ -247,46 +248,46 @@ if _descriptor._USE_C_DESCRIPTORS == False:
 
     DESCRIPTOR._options = None
     DESCRIPTOR._serialized_options = 
b"\n\036org.apache.spark.connect.protoP\001"
-    _EXPRESSION._serialized_start = 78
-    _EXPRESSION._serialized_end = 4352
-    _EXPRESSION_WINDOW._serialized_start = 1132
-    _EXPRESSION_WINDOW._serialized_end = 1915
-    _EXPRESSION_WINDOW_WINDOWFRAME._serialized_start = 1422
-    _EXPRESSION_WINDOW_WINDOWFRAME._serialized_end = 1915
-    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMEBOUNDARY._serialized_start = 1689
-    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMEBOUNDARY._serialized_end = 1834
-    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMETYPE._serialized_start = 1836
-    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMETYPE._serialized_end = 1915
-    _EXPRESSION_SORTORDER._serialized_start = 1918
-    _EXPRESSION_SORTORDER._serialized_end = 2343
-    _EXPRESSION_SORTORDER_SORTDIRECTION._serialized_start = 2148
-    _EXPRESSION_SORTORDER_SORTDIRECTION._serialized_end = 2256
-    _EXPRESSION_SORTORDER_NULLORDERING._serialized_start = 2258
-    _EXPRESSION_SORTORDER_NULLORDERING._serialized_end = 2343
-    _EXPRESSION_CAST._serialized_start = 2346
-    _EXPRESSION_CAST._serialized_end = 2491
-    _EXPRESSION_LITERAL._serialized_start = 2494
-    _EXPRESSION_LITERAL._serialized_end = 3370
-    _EXPRESSION_LITERAL_DECIMAL._serialized_start = 3137
-    _EXPRESSION_LITERAL_DECIMAL._serialized_end = 3254
-    _EXPRESSION_LITERAL_CALENDARINTERVAL._serialized_start = 3256
-    _EXPRESSION_LITERAL_CALENDARINTERVAL._serialized_end = 3354
-    _EXPRESSION_UNRESOLVEDATTRIBUTE._serialized_start = 3372
-    _EXPRESSION_UNRESOLVEDATTRIBUTE._serialized_end = 3442
-    _EXPRESSION_UNRESOLVEDFUNCTION._serialized_start = 3445
-    _EXPRESSION_UNRESOLVEDFUNCTION._serialized_end = 3649
-    _EXPRESSION_EXPRESSIONSTRING._serialized_start = 3651
-    _EXPRESSION_EXPRESSIONSTRING._serialized_end = 3701
-    _EXPRESSION_UNRESOLVEDSTAR._serialized_start = 3703
-    _EXPRESSION_UNRESOLVEDSTAR._serialized_end = 3743
-    _EXPRESSION_UNRESOLVEDREGEX._serialized_start = 3745
-    _EXPRESSION_UNRESOLVEDREGEX._serialized_end = 3789
-    _EXPRESSION_UNRESOLVEDEXTRACTVALUE._serialized_start = 3792
-    _EXPRESSION_UNRESOLVEDEXTRACTVALUE._serialized_end = 3924
-    _EXPRESSION_UPDATEFIELDS._serialized_start = 3927
-    _EXPRESSION_UPDATEFIELDS._serialized_end = 4114
-    _EXPRESSION_ALIAS._serialized_start = 4116
-    _EXPRESSION_ALIAS._serialized_end = 4236
-    _EXPRESSION_LAMBDAFUNCTION._serialized_start = 4238
-    _EXPRESSION_LAMBDAFUNCTION._serialized_end = 4339
+    _EXPRESSION._serialized_start = 105
+    _EXPRESSION._serialized_end = 4434
+    _EXPRESSION_WINDOW._serialized_start = 1214
+    _EXPRESSION_WINDOW._serialized_end = 1997
+    _EXPRESSION_WINDOW_WINDOWFRAME._serialized_start = 1504
+    _EXPRESSION_WINDOW_WINDOWFRAME._serialized_end = 1997
+    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMEBOUNDARY._serialized_start = 1771
+    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMEBOUNDARY._serialized_end = 1916
+    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMETYPE._serialized_start = 1918
+    _EXPRESSION_WINDOW_WINDOWFRAME_FRAMETYPE._serialized_end = 1997
+    _EXPRESSION_SORTORDER._serialized_start = 2000
+    _EXPRESSION_SORTORDER._serialized_end = 2425
+    _EXPRESSION_SORTORDER_SORTDIRECTION._serialized_start = 2230
+    _EXPRESSION_SORTORDER_SORTDIRECTION._serialized_end = 2338
+    _EXPRESSION_SORTORDER_NULLORDERING._serialized_start = 2340
+    _EXPRESSION_SORTORDER_NULLORDERING._serialized_end = 2425
+    _EXPRESSION_CAST._serialized_start = 2428
+    _EXPRESSION_CAST._serialized_end = 2573
+    _EXPRESSION_LITERAL._serialized_start = 2576
+    _EXPRESSION_LITERAL._serialized_end = 3452
+    _EXPRESSION_LITERAL_DECIMAL._serialized_start = 3219
+    _EXPRESSION_LITERAL_DECIMAL._serialized_end = 3336
+    _EXPRESSION_LITERAL_CALENDARINTERVAL._serialized_start = 3338
+    _EXPRESSION_LITERAL_CALENDARINTERVAL._serialized_end = 3436
+    _EXPRESSION_UNRESOLVEDATTRIBUTE._serialized_start = 3454
+    _EXPRESSION_UNRESOLVEDATTRIBUTE._serialized_end = 3524
+    _EXPRESSION_UNRESOLVEDFUNCTION._serialized_start = 3527
+    _EXPRESSION_UNRESOLVEDFUNCTION._serialized_end = 3731
+    _EXPRESSION_EXPRESSIONSTRING._serialized_start = 3733
+    _EXPRESSION_EXPRESSIONSTRING._serialized_end = 3783
+    _EXPRESSION_UNRESOLVEDSTAR._serialized_start = 3785
+    _EXPRESSION_UNRESOLVEDSTAR._serialized_end = 3825
+    _EXPRESSION_UNRESOLVEDREGEX._serialized_start = 3827
+    _EXPRESSION_UNRESOLVEDREGEX._serialized_end = 3871
+    _EXPRESSION_UNRESOLVEDEXTRACTVALUE._serialized_start = 3874
+    _EXPRESSION_UNRESOLVEDEXTRACTVALUE._serialized_end = 4006
+    _EXPRESSION_UPDATEFIELDS._serialized_start = 4009
+    _EXPRESSION_UPDATEFIELDS._serialized_end = 4196
+    _EXPRESSION_ALIAS._serialized_start = 4198
+    _EXPRESSION_ALIAS._serialized_end = 4318
+    _EXPRESSION_LAMBDAFUNCTION._serialized_start = 4320
+    _EXPRESSION_LAMBDAFUNCTION._serialized_end = 4421
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/expressions_pb2.pyi 
b/python/pyspark/sql/connect/proto/expressions_pb2.pyi
index 5e5eab5b5d9..d4087245a73 100644
--- a/python/pyspark/sql/connect/proto/expressions_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/expressions_pb2.pyi
@@ -35,6 +35,7 @@ limitations under the License.
 """
 import builtins
 import collections.abc
+import google.protobuf.any_pb2
 import google.protobuf.descriptor
 import google.protobuf.internal.containers
 import google.protobuf.internal.enum_type_wrapper
@@ -898,6 +899,7 @@ class Expression(google.protobuf.message.Message):
     WINDOW_FIELD_NUMBER: builtins.int
     UNRESOLVED_EXTRACT_VALUE_FIELD_NUMBER: builtins.int
     UPDATE_FIELDS_FIELD_NUMBER: builtins.int
+    EXTENSION_FIELD_NUMBER: builtins.int
     @property
     def literal(self) -> global___Expression.Literal: ...
     @property
@@ -924,6 +926,11 @@ class Expression(google.protobuf.message.Message):
     def unresolved_extract_value(self) -> 
global___Expression.UnresolvedExtractValue: ...
     @property
     def update_fields(self) -> global___Expression.UpdateFields: ...
+    @property
+    def extension(self) -> google.protobuf.any_pb2.Any:
+        """This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
+        relations they can add them here. During the planning the correct 
resolution is done.
+        """
     def __init__(
         self,
         *,
@@ -940,6 +947,7 @@ class Expression(google.protobuf.message.Message):
         window: global___Expression.Window | None = ...,
         unresolved_extract_value: global___Expression.UnresolvedExtractValue | 
None = ...,
         update_fields: global___Expression.UpdateFields | None = ...,
+        extension: google.protobuf.any_pb2.Any | None = ...,
     ) -> None: ...
     def HasField(
         self,
@@ -952,6 +960,8 @@ class Expression(google.protobuf.message.Message):
             b"expr_type",
             "expression_string",
             b"expression_string",
+            "extension",
+            b"extension",
             "lambda_function",
             b"lambda_function",
             "literal",
@@ -985,6 +995,8 @@ class Expression(google.protobuf.message.Message):
             b"expr_type",
             "expression_string",
             b"expression_string",
+            "extension",
+            b"extension",
             "lambda_function",
             b"lambda_function",
             "literal",
@@ -1023,6 +1035,7 @@ class Expression(google.protobuf.message.Message):
         "window",
         "unresolved_extract_value",
         "update_fields",
+        "extension",
     ] | None: ...
 
 global___Expression = Expression
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py 
b/python/pyspark/sql/connect/proto/relations_pb2.py
index 7c938831882..92a6d252a68 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -29,13 +29,14 @@ from google.protobuf import symbol_database as 
_symbol_database
 _sym_db = _symbol_database.Default()
 
 
+from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
 from pyspark.sql.connect.proto import expressions_pb2 as 
spark_dot_connect_dot_expressions__pb2
 from pyspark.sql.connect.proto import types_pb2 as 
spark_dot_connect_dot_types__pb2
 from pyspark.sql.connect.proto import catalog_pb2 as 
spark_dot_connect_dot_catalog__pb2
 
 
 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xed\x10\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66ilter\x18\x04
 \x01(\x0b\x32\x15.spa [...]
+    
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xa4\x11\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
 
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
 
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03 
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
 )
 
 
@@ -562,96 +563,96 @@ if _descriptor._USE_C_DESCRIPTORS == False:
     _READ_DATASOURCE_OPTIONSENTRY._serialized_options = b"8\001"
     _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._options = None
     _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_options = 
b"8\001"
-    _RELATION._serialized_start = 138
-    _RELATION._serialized_end = 2295
-    _UNKNOWN._serialized_start = 2297
-    _UNKNOWN._serialized_end = 2306
-    _RELATIONCOMMON._serialized_start = 2308
-    _RELATIONCOMMON._serialized_end = 2357
-    _SQL._serialized_start = 2359
-    _SQL._serialized_end = 2386
-    _READ._serialized_start = 2389
-    _READ._serialized_end = 2815
-    _READ_NAMEDTABLE._serialized_start = 2531
-    _READ_NAMEDTABLE._serialized_end = 2592
-    _READ_DATASOURCE._serialized_start = 2595
-    _READ_DATASOURCE._serialized_end = 2802
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 2733
-    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 2791
-    _PROJECT._serialized_start = 2817
-    _PROJECT._serialized_end = 2934
-    _FILTER._serialized_start = 2936
-    _FILTER._serialized_end = 3048
-    _JOIN._serialized_start = 3051
-    _JOIN._serialized_end = 3522
-    _JOIN_JOINTYPE._serialized_start = 3314
-    _JOIN_JOINTYPE._serialized_end = 3522
-    _SETOPERATION._serialized_start = 3525
-    _SETOPERATION._serialized_end = 3921
-    _SETOPERATION_SETOPTYPE._serialized_start = 3784
-    _SETOPERATION_SETOPTYPE._serialized_end = 3898
-    _LIMIT._serialized_start = 3923
-    _LIMIT._serialized_end = 3999
-    _OFFSET._serialized_start = 4001
-    _OFFSET._serialized_end = 4080
-    _TAIL._serialized_start = 4082
-    _TAIL._serialized_end = 4157
-    _AGGREGATE._serialized_start = 4160
-    _AGGREGATE._serialized_end = 4742
-    _AGGREGATE_PIVOT._serialized_start = 4499
-    _AGGREGATE_PIVOT._serialized_end = 4610
-    _AGGREGATE_GROUPTYPE._serialized_start = 4613
-    _AGGREGATE_GROUPTYPE._serialized_end = 4742
-    _SORT._serialized_start = 4745
-    _SORT._serialized_end = 4905
-    _DROP._serialized_start = 4907
-    _DROP._serialized_end = 5007
-    _DEDUPLICATE._serialized_start = 5010
-    _DEDUPLICATE._serialized_end = 5181
-    _LOCALRELATION._serialized_start = 5184
-    _LOCALRELATION._serialized_end = 5321
-    _SAMPLE._serialized_start = 5324
-    _SAMPLE._serialized_end = 5619
-    _RANGE._serialized_start = 5622
-    _RANGE._serialized_end = 5767
-    _SUBQUERYALIAS._serialized_start = 5769
-    _SUBQUERYALIAS._serialized_end = 5883
-    _REPARTITION._serialized_start = 5886
-    _REPARTITION._serialized_end = 6028
-    _SHOWSTRING._serialized_start = 6031
-    _SHOWSTRING._serialized_end = 6173
-    _STATSUMMARY._serialized_start = 6175
-    _STATSUMMARY._serialized_end = 6267
-    _STATDESCRIBE._serialized_start = 6269
-    _STATDESCRIBE._serialized_end = 6350
-    _STATCROSSTAB._serialized_start = 6352
-    _STATCROSSTAB._serialized_end = 6453
-    _STATCOV._serialized_start = 6455
-    _STATCOV._serialized_end = 6551
-    _STATCORR._serialized_start = 6554
-    _STATCORR._serialized_end = 6691
-    _NAFILL._serialized_start = 6694
-    _NAFILL._serialized_end = 6828
-    _NADROP._serialized_start = 6831
-    _NADROP._serialized_end = 6965
-    _NAREPLACE._serialized_start = 6968
-    _NAREPLACE._serialized_end = 7264
-    _NAREPLACE_REPLACEMENT._serialized_start = 7123
-    _NAREPLACE_REPLACEMENT._serialized_end = 7264
-    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 7266
-    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 7380
-    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 7383
-    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 7642
-    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 
7575
-    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 7642
-    _WITHCOLUMNS._serialized_start = 7645
-    _WITHCOLUMNS._serialized_end = 7776
-    _HINT._serialized_start = 7779
-    _HINT._serialized_end = 7919
-    _UNPIVOT._serialized_start = 7922
-    _UNPIVOT._serialized_end = 8168
-    _TOSCHEMA._serialized_start = 8170
-    _TOSCHEMA._serialized_end = 8276
-    _REPARTITIONBYEXPRESSION._serialized_start = 8279
-    _REPARTITIONBYEXPRESSION._serialized_end = 8482
+    _RELATION._serialized_start = 165
+    _RELATION._serialized_end = 2377
+    _UNKNOWN._serialized_start = 2379
+    _UNKNOWN._serialized_end = 2388
+    _RELATIONCOMMON._serialized_start = 2390
+    _RELATIONCOMMON._serialized_end = 2439
+    _SQL._serialized_start = 2441
+    _SQL._serialized_end = 2468
+    _READ._serialized_start = 2471
+    _READ._serialized_end = 2897
+    _READ_NAMEDTABLE._serialized_start = 2613
+    _READ_NAMEDTABLE._serialized_end = 2674
+    _READ_DATASOURCE._serialized_start = 2677
+    _READ_DATASOURCE._serialized_end = 2884
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 2815
+    _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 2873
+    _PROJECT._serialized_start = 2899
+    _PROJECT._serialized_end = 3016
+    _FILTER._serialized_start = 3018
+    _FILTER._serialized_end = 3130
+    _JOIN._serialized_start = 3133
+    _JOIN._serialized_end = 3604
+    _JOIN_JOINTYPE._serialized_start = 3396
+    _JOIN_JOINTYPE._serialized_end = 3604
+    _SETOPERATION._serialized_start = 3607
+    _SETOPERATION._serialized_end = 4003
+    _SETOPERATION_SETOPTYPE._serialized_start = 3866
+    _SETOPERATION_SETOPTYPE._serialized_end = 3980
+    _LIMIT._serialized_start = 4005
+    _LIMIT._serialized_end = 4081
+    _OFFSET._serialized_start = 4083
+    _OFFSET._serialized_end = 4162
+    _TAIL._serialized_start = 4164
+    _TAIL._serialized_end = 4239
+    _AGGREGATE._serialized_start = 4242
+    _AGGREGATE._serialized_end = 4824
+    _AGGREGATE_PIVOT._serialized_start = 4581
+    _AGGREGATE_PIVOT._serialized_end = 4692
+    _AGGREGATE_GROUPTYPE._serialized_start = 4695
+    _AGGREGATE_GROUPTYPE._serialized_end = 4824
+    _SORT._serialized_start = 4827
+    _SORT._serialized_end = 4987
+    _DROP._serialized_start = 4989
+    _DROP._serialized_end = 5089
+    _DEDUPLICATE._serialized_start = 5092
+    _DEDUPLICATE._serialized_end = 5263
+    _LOCALRELATION._serialized_start = 5266
+    _LOCALRELATION._serialized_end = 5403
+    _SAMPLE._serialized_start = 5406
+    _SAMPLE._serialized_end = 5701
+    _RANGE._serialized_start = 5704
+    _RANGE._serialized_end = 5849
+    _SUBQUERYALIAS._serialized_start = 5851
+    _SUBQUERYALIAS._serialized_end = 5965
+    _REPARTITION._serialized_start = 5968
+    _REPARTITION._serialized_end = 6110
+    _SHOWSTRING._serialized_start = 6113
+    _SHOWSTRING._serialized_end = 6255
+    _STATSUMMARY._serialized_start = 6257
+    _STATSUMMARY._serialized_end = 6349
+    _STATDESCRIBE._serialized_start = 6351
+    _STATDESCRIBE._serialized_end = 6432
+    _STATCROSSTAB._serialized_start = 6434
+    _STATCROSSTAB._serialized_end = 6535
+    _STATCOV._serialized_start = 6537
+    _STATCOV._serialized_end = 6633
+    _STATCORR._serialized_start = 6636
+    _STATCORR._serialized_end = 6773
+    _NAFILL._serialized_start = 6776
+    _NAFILL._serialized_end = 6910
+    _NADROP._serialized_start = 6913
+    _NADROP._serialized_end = 7047
+    _NAREPLACE._serialized_start = 7050
+    _NAREPLACE._serialized_end = 7346
+    _NAREPLACE_REPLACEMENT._serialized_start = 7205
+    _NAREPLACE_REPLACEMENT._serialized_end = 7346
+    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_start = 7348
+    _RENAMECOLUMNSBYSAMELENGTHNAMES._serialized_end = 7462
+    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_start = 7465
+    _RENAMECOLUMNSBYNAMETONAMEMAP._serialized_end = 7724
+    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_start = 
7657
+    _RENAMECOLUMNSBYNAMETONAMEMAP_RENAMECOLUMNSMAPENTRY._serialized_end = 7724
+    _WITHCOLUMNS._serialized_start = 7727
+    _WITHCOLUMNS._serialized_end = 7858
+    _HINT._serialized_start = 7861
+    _HINT._serialized_end = 8001
+    _UNPIVOT._serialized_start = 8004
+    _UNPIVOT._serialized_end = 8250
+    _TOSCHEMA._serialized_start = 8252
+    _TOSCHEMA._serialized_end = 8358
+    _REPARTITIONBYEXPRESSION._serialized_start = 8361
+    _REPARTITIONBYEXPRESSION._serialized_end = 8564
 # @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi 
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index 63ccfa18559..36b55ec4f6b 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -35,6 +35,7 @@ limitations under the License.
 """
 import builtins
 import collections.abc
+import google.protobuf.any_pb2
 import google.protobuf.descriptor
 import google.protobuf.internal.containers
 import google.protobuf.internal.enum_type_wrapper
@@ -97,6 +98,7 @@ class Relation(google.protobuf.message.Message):
     COV_FIELD_NUMBER: builtins.int
     CORR_FIELD_NUMBER: builtins.int
     CATALOG_FIELD_NUMBER: builtins.int
+    EXTENSION_FIELD_NUMBER: builtins.int
     UNKNOWN_FIELD_NUMBER: builtins.int
     @property
     def common(self) -> global___RelationCommon: ...
@@ -174,6 +176,11 @@ class Relation(google.protobuf.message.Message):
     def catalog(self) -> pyspark.sql.connect.proto.catalog_pb2.Catalog:
         """Catalog API (experimental / unstable)"""
     @property
+    def extension(self) -> google.protobuf.any_pb2.Any:
+        """This field is used to mark extensions to the protocol. When plugins 
generate arbitrary
+        relations they can add them here. During the planning the correct 
resolution is done.
+        """
+    @property
     def unknown(self) -> global___Unknown: ...
     def __init__(
         self,
@@ -214,6 +221,7 @@ class Relation(google.protobuf.message.Message):
         cov: global___StatCov | None = ...,
         corr: global___StatCorr | None = ...,
         catalog: pyspark.sql.connect.proto.catalog_pb2.Catalog | None = ...,
+        extension: google.protobuf.any_pb2.Any | None = ...,
         unknown: global___Unknown | None = ...,
     ) -> None: ...
     def HasField(
@@ -239,6 +247,8 @@ class Relation(google.protobuf.message.Message):
             b"drop",
             "drop_na",
             b"drop_na",
+            "extension",
+            b"extension",
             "fill_na",
             b"fill_na",
             "filter",
@@ -320,6 +330,8 @@ class Relation(google.protobuf.message.Message):
             b"drop",
             "drop_na",
             b"drop_na",
+            "extension",
+            b"extension",
             "fill_na",
             b"fill_na",
             "filter",
@@ -416,6 +428,7 @@ class Relation(google.protobuf.message.Message):
         "cov",
         "corr",
         "catalog",
+        "extension",
         "unknown",
     ] | None: ...
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to