This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit af547f31456de9d242298be7f6c26b7fc9ef41bb
Author: forwardxu <[email protected]>
AuthorDate: Sun Jan 18 13:31:26 2026 +0800

    [spark] Support Spark CALL procedure command framework (#2408)
---
 fluss-spark/fluss-spark-common/pom.xml             |  22 ++
 .../FlussSparkSqlParser.g4                         | 230 +++++++++++++++++++++
 .../fluss/spark/FlussSparkSessionExtensions.scala  |  39 ++++
 .../org/apache/fluss/spark/SparkCatalog.scala      |  25 ++-
 .../org/apache/fluss/spark/SparkProcedures.scala   |  40 ++++
 .../fluss/spark/catalog/SupportsProcedures.scala   |  29 +++
 .../fluss/spark/catalog/WithFlussAdmin.scala       |   9 +
 .../catalyst/analysis/FlussProcedureResolver.scala | 189 +++++++++++++++++
 .../catalyst/plans/logical/FlussCallCommand.scala  |  52 +++++
 .../spark/exception/NoSuchProcedureException.scala |  33 +++
 .../fluss/spark/execution/CallProcedureExec.scala  |  95 +++++++++
 .../fluss/spark/execution/FlussStrategy.scala      |  36 ++++
 .../fluss/spark/procedure/BaseProcedure.scala      | 176 ++++++++++++++++
 .../procedure/GetClusterConfigsProcedure.scala     | 140 +++++++++++++
 .../apache/fluss/spark/procedure/Procedure.scala   |  69 +++++++
 .../fluss/spark/procedure/ProcedureBuilder.scala   |  47 +++++
 .../fluss/spark/procedure/ProcedureParameter.scala |  61 ++++++
 .../sql/catalyst/parser/FlussSparkSqlParser.scala  | 142 +++++++++++++
 .../sql/catalyst/parser/FlussSqlAstBuilder.scala   | 145 +++++++++++++
 .../apache/fluss/spark/FlussSparkTestBase.scala    |   1 +
 .../spark/extensions/CallStatementParserTest.scala | 154 ++++++++++++++
 .../procedure/GetClusterConfigsProcedureTest.scala | 121 +++++++++++
 website/blog/2024-11-29-fluss-open-source.md       |   2 +-
 website/blog/2025-06-01-partial-updates.md         |   4 +-
 website/blog/releases/0.6.md                       |   2 +-
 website/blog/releases/0.7.md                       |   8 +-
 website/blog/releases/0.8.md                       |  16 +-
 website/docs/engine-spark/_category_.json          |   4 +
 website/docs/engine-spark/procedures.md            | 107 ++++++++++
 website/docs/maintenance/operations/rebalance.md   |   2 +-
 .../tiered-storage/lakehouse-storage.md            |   2 +-
 website/src/pages/index.tsx                        |   2 +-
 32 files changed, 1983 insertions(+), 21 deletions(-)

diff --git a/fluss-spark/fluss-spark-common/pom.xml 
b/fluss-spark/fluss-spark-common/pom.xml
index e285b8bbc..95b7c3bfb 100644
--- a/fluss-spark/fluss-spark-common/pom.xml
+++ b/fluss-spark/fluss-spark-common/pom.xml
@@ -41,10 +41,32 @@
             <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.antlr</groupId>
+            <artifactId>antlr4-runtime</artifactId>
+            <version>4.9.3</version>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.antlr</groupId>
+                <artifactId>antlr4-maven-plugin</artifactId>
+                <version>4.9.3</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>antlr4</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <visitor>true</visitor>
+                </configuration>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-shade-plugin</artifactId>
diff --git 
a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4
 
b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4
new file mode 100644
index 000000000..d377e677c
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar FlussSparkSqlParser;
+
+@lexer::members {
+  public boolean isValidDecimal() {
+    int nextChar = _input.LA(1);
+    if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= 
'9' ||
+      nextChar == '_') {
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  public boolean isHint() {
+    int nextChar = _input.LA(1);
+    if (nextChar == '+') {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}
+
+singleStatement
+    : statement ';'* EOF
+    ;
+
+statement
+    : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')'  
#call
+    ;
+
+callArgument
+    : expression                    #positionalArgument
+    | identifier '=>' expression    #namedArgument
+    ;
+
+expression
+    : booleanExpression
+    ;
+
+booleanExpression
+    : predicated
+    | NOT booleanExpression
+    | EXISTS '(' query ')'
+    | booleanExpression AND booleanExpression
+    | booleanExpression OR booleanExpression
+    ;
+
+predicated
+    : valueExpression
+    ;
+
+valueExpression
+    : primaryExpression
+    ;
+
+primaryExpression
+    : constant                                                        
#constantDefault
+    | functionName '(' (expression (',' expression)*)? ')'            
#functionCall
+    | '(' expression ')'                                              
#parenthesizedExpression
+    ;
+
+functionName
+    : multipartIdentifier
+    | identifier
+    ;
+
+query
+    : .+?
+    ;
+
+constant
+    : number                          #numericLiteral
+    | booleanValue                    #booleanLiteral
+    | STRING+                         #stringLiteral
+    | identifier STRING               #typeConstructor
+    ;
+
+booleanValue
+    : TRUE | FALSE
+    ;
+
+number
+    : MINUS? EXPONENT_VALUE           #exponentLiteral
+    | MINUS? DECIMAL_VALUE            #decimalLiteral
+    | MINUS? INTEGER_VALUE            #integerLiteral
+    | MINUS? BIGINT_LITERAL           #bigIntLiteral
+    | MINUS? SMALLINT_LITERAL         #smallIntLiteral
+    | MINUS? TINYINT_LITERAL          #tinyIntLiteral
+    | MINUS? DOUBLE_LITERAL           #doubleLiteral
+    | MINUS? FLOAT_LITERAL            #floatLiteral
+    | MINUS? BIGDECIMAL_LITERAL       #bigDecimalLiteral
+    ;
+
+multipartIdentifier
+    : parts+=identifier ('.' parts+=identifier)*
+    ;
+
+identifier
+    : IDENTIFIER              #unquotedIdentifier
+    | quotedIdentifier        #quotedIdentifierAlternative
+    | nonReserved             #unquotedIdentifier
+    ;
+
+quotedIdentifier
+    : BACKQUOTED_IDENTIFIER
+    ;
+
+nonReserved
+    : CALL | TRUE | FALSE | NOT | AND | OR | EXISTS
+    ;
+
+// Keywords
+CALL: 'CALL';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+NOT: 'NOT';
+AND: 'AND';
+OR: 'OR';
+EXISTS: 'EXISTS';
+
+// Operators
+MINUS: '-';
+
+// Literals
+STRING
+    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+BIGINT_LITERAL
+    : INTEGER_VALUE 'L'
+    ;
+
+SMALLINT_LITERAL
+    : INTEGER_VALUE 'S'
+    ;
+
+TINYINT_LITERAL
+    : INTEGER_VALUE 'Y'
+    ;
+
+INTEGER_VALUE
+    : DIGIT+
+    ;
+
+EXPONENT_VALUE
+    : DIGIT+ EXPONENT
+    | DECIMAL_DIGITS EXPONENT {isValidDecimal()}?
+    ;
+
+DECIMAL_VALUE
+    : DECIMAL_DIGITS {isValidDecimal()}?
+    ;
+
+FLOAT_LITERAL
+    : DIGIT+ EXPONENT? 'F'
+    | DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}?
+    ;
+
+DOUBLE_LITERAL
+    : DIGIT+ EXPONENT? 'D'
+    | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+    ;
+
+BIGDECIMAL_LITERAL
+    : DIGIT+ EXPONENT? 'BD'
+    | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+    ;
+
+IDENTIFIER
+    : (LETTER | DIGIT | '_')+
+    ;
+
+BACKQUOTED_IDENTIFIER
+    : '`' ( ~'`' | '``' )* '`'
+    ;
+
+fragment DECIMAL_DIGITS
+    : DIGIT+ '.' DIGIT*
+    | '.' DIGIT+
+    ;
+
+fragment EXPONENT
+    : 'E' [+-]? DIGIT+
+    ;
+
+fragment DIGIT
+    : [0-9]
+    ;
+
+fragment LETTER
+    : [A-Z]
+    ;
+
+// Whitespace and comments
+SIMPLE_COMMENT
+    : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
+    ;
+
+BRACKETED_COMMENT
+    : '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN)
+    ;
+
+WS
+    : [ \r\n\t]+ -> channel(HIDDEN)
+    ;
+
+// Catch-all for any characters we didn't match
+UNRECOGNIZED
+    : .
+    ;
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussSparkSessionExtensions.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussSparkSessionExtensions.scala
new file mode 100644
index 000000000..dc12ed503
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussSparkSessionExtensions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.spark.catalyst.analysis.FlussProcedureResolver
+import org.apache.fluss.spark.execution.FlussStrategy
+
+import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.catalyst.parser.FlussSparkSqlParser
+
+/** Spark session extensions for Fluss. */
+class FlussSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
+
+  override def apply(extensions: SparkSessionExtensions): Unit = {
+    // parser extensions
+    extensions.injectParser { case (_, parser) => new 
FlussSparkSqlParser(parser) }
+
+    // analyzer extensions
+    extensions.injectResolutionRule(spark => FlussProcedureResolver(spark))
+
+    // planner extensions
+    extensions.injectPlannerStrategy(spark => FlussStrategy(spark))
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index 045063e4f..53426a4d1 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -19,7 +19,9 @@ package org.apache.fluss.spark
 
 import org.apache.fluss.exception.{DatabaseNotExistException, 
TableAlreadyExistException, TableNotExistException}
 import org.apache.fluss.metadata.TablePath
-import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin}
+import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, 
SupportsProcedures, WithFlussAdmin}
+import org.apache.fluss.spark.exception.NoSuchProcedureException
+import org.apache.fluss.spark.procedure.{Procedure, ProcedureBuilder}
 
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, 
TableCatalog, TableChange}
@@ -32,9 +34,14 @@ import java.util.concurrent.ExecutionException
 
 import scala.collection.JavaConverters._
 
-class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with 
WithFlussAdmin {
+class SparkCatalog
+  extends TableCatalog
+  with SupportsFlussNamespaces
+  with WithFlussAdmin
+  with SupportsProcedures {
 
   private var catalogName: String = "fluss"
+  private val SYSTEM_NAMESPACE = "sys"
 
   override def listTables(namespace: Array[String]): Array[Identifier] = {
     doNamespaceOperator(namespace) {
@@ -116,6 +123,20 @@ class SparkCatalog extends TableCatalog with 
SupportsFlussNamespaces with WithFl
 
   override def name(): String = catalogName
 
+  override def loadProcedure(identifier: Identifier): Procedure = {
+    if (isSystemNamespace(identifier.namespace)) {
+      val builder: ProcedureBuilder = 
SparkProcedures.newBuilder(identifier.name)
+      if (builder != null) {
+        return builder.withTableCatalog(this).build()
+      }
+    }
+    throw new NoSuchProcedureException(s"Procedure not found: $identifier")
+  }
+
+  private def isSystemNamespace(namespace: Array[String]): Boolean = {
+    namespace.length == 1 && namespace(0).equalsIgnoreCase(SYSTEM_NAMESPACE)
+  }
+
   private def toTablePath(ident: Identifier): TablePath = {
     assert(ident.namespace().length == 1, "Only single namespace is supported")
     TablePath.of(ident.namespace().head, ident.name)
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala
new file mode 100644
index 000000000..12a4b0582
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.spark.procedure.{GetClusterConfigsProcedure, 
ProcedureBuilder}
+
+import java.util.Locale
+
+object SparkProcedures {
+
+  private val BUILDERS: Map[String, () => ProcedureBuilder] = 
initProcedureBuilders()
+
+  def newBuilder(name: String): ProcedureBuilder = {
+    val builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT))
+    builderSupplier.map(_()).orNull
+  }
+
+  def names(): Set[String] = BUILDERS.keySet
+
+  private def initProcedureBuilders(): Map[String, () => ProcedureBuilder] = {
+    Map(
+      "get_cluster_configs" -> (() => GetClusterConfigsProcedure.builder())
+    )
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsProcedures.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsProcedures.scala
new file mode 100644
index 000000000..cbc6adb1b
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsProcedures.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalog
+
+import org.apache.fluss.spark.exception.NoSuchProcedureException
+import org.apache.fluss.spark.procedure.Procedure
+
+import org.apache.spark.sql.connector.catalog.Identifier
+
+trait SupportsProcedures {
+
+  @throws[NoSuchProcedureException]
+  def loadProcedure(identifier: Identifier): Procedure
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
index 2241b6b96..3e7653cdf 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
@@ -59,6 +59,15 @@ trait WithFlussAdmin extends AutoCloseable {
     _admin
   }
 
+  /**
+   * Gets the Fluss Admin client. This is a public accessor for procedures and 
other external
+   * components that need admin access.
+   *
+   * @return
+   *   the Admin instance
+   */
+  def getAdmin: Admin = admin
+
   override def close(): Unit = {
     IOUtils.closeQuietly(_admin, "fluss-admin")
     IOUtils.closeQuietly(_connection, "fluss-connection");
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala
new file mode 100644
index 000000000..a4aa96e3d
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalyst.analysis
+
+import org.apache.fluss.spark.catalog.SupportsProcedures
+import org.apache.fluss.spark.catalyst.plans.logical._
+import org.apache.fluss.spark.procedure.ProcedureParameter
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier}
+
+import java.util.Locale
+
+/** Resolution rule for Fluss stored procedures. */
+case class FlussProcedureResolver(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    case FlussCallStatement(nameParts, arguments) if nameParts.nonEmpty =>
+      val (catalog, identifier) = resolveCatalogAndIdentifier(nameParts)
+      if (catalog == null || !catalog.isInstanceOf[SupportsProcedures]) {
+        throw new RuntimeException(s"Catalog ${nameParts.head} is not a 
SupportsProcedures")
+      }
+
+      val procedureCatalog = catalog.asInstanceOf[SupportsProcedures]
+      val procedure = procedureCatalog.loadProcedure(identifier)
+      val parameters = procedure.parameters
+      val normalizedParameters = normalizeParameters(parameters)
+      validateParameters(normalizedParameters)
+      val normalizedArguments = normalizeArguments(arguments)
+      FlussCallCommand(
+        procedure,
+        args = buildArgumentExpressions(normalizedParameters, 
normalizedArguments))
+
+    case call @ FlussCallCommand(procedure, arguments) if call.resolved =>
+      val parameters = procedure.parameters
+      val newArguments = arguments.zipWithIndex.map {
+        case (argument, index) =>
+          val parameter = parameters(index)
+          val parameterType = parameter.dataType
+          val argumentType = argument.dataType
+          if (parameterType != argumentType && !Cast.canUpCast(argumentType, 
parameterType)) {
+            throw new RuntimeException(
+              s"Cannot cast $argumentType to $parameterType of 
${parameter.name}.")
+          }
+          if (parameterType != argumentType) {
+            Cast(argument, parameterType)
+          } else {
+            argument
+          }
+      }
+
+      if (newArguments != arguments) {
+        call.copy(args = newArguments)
+      } else {
+        call
+      }
+  }
+
+  private def resolveCatalogAndIdentifier(nameParts: Seq[String]): 
(CatalogPlugin, Identifier) = {
+    val catalogManager = sparkSession.sessionState.catalogManager
+    if (nameParts.length == 2) {
+      val catalogName = nameParts.head
+      val procedureName = nameParts(1)
+      val catalog = catalogManager.catalog(catalogName)
+      (catalog, Identifier.of(Array("sys"), procedureName))
+    } else if (nameParts.length == 3) {
+      val catalogName = nameParts.head
+      val namespace = nameParts(1)
+      val procedureName = nameParts(2)
+      val catalog = catalogManager.catalog(catalogName)
+      (catalog, Identifier.of(Array(namespace), procedureName))
+    } else {
+      throw new RuntimeException(s"Invalid procedure name: 
${nameParts.mkString(".")}")
+    }
+  }
+
+  private def normalizeParameters(parameters: Seq[ProcedureParameter]): 
Seq[ProcedureParameter] = {
+    parameters.map {
+      parameter =>
+        val normalizedName = parameter.name.toLowerCase(Locale.ROOT)
+        if (parameter.isRequired) {
+          ProcedureParameter.required(normalizedName, parameter.dataType)
+        } else {
+          ProcedureParameter.optional(normalizedName, parameter.dataType)
+        }
+    }
+  }
+
+  private def validateParameters(parameters: Seq[ProcedureParameter]): Unit = {
+    val duplicateParamNames = parameters.groupBy(_.name).collect {
+      case (name, matchingParams) if matchingParams.length > 1 => name
+    }
+    if (duplicateParamNames.nonEmpty) {
+      throw new RuntimeException(
+        s"Parameter names ${duplicateParamNames.mkString("[", ",", "]")} are 
duplicated.")
+    }
+    parameters.sliding(2).foreach {
+      case Seq(previousParam, currentParam)
+          if !previousParam.isRequired && currentParam.isRequired =>
+        throw new RuntimeException(
+          s"Optional parameters should be after required ones but 
$currentParam is after $previousParam.")
+      case _ =>
+    }
+  }
+
+  private def normalizeArguments(arguments: Seq[FlussCallArgument]): 
Seq[FlussCallArgument] = {
+    arguments.map {
+      case a @ FlussNamedArgument(name, _) => a.copy(name = 
name.toLowerCase(Locale.ROOT))
+      case other => other
+    }
+  }
+
+  private def buildArgumentExpressions(
+      parameters: Seq[ProcedureParameter],
+      arguments: Seq[FlussCallArgument]): Seq[Expression] = {
+    val nameToPositionMap = parameters.map(_.name).zipWithIndex.toMap
+    val nameToArgumentMap = buildNameToArgumentMap(parameters, arguments, 
nameToPositionMap)
+    val missingParamNames = parameters.filter(_.isRequired).collect {
+      case parameter if !nameToArgumentMap.contains(parameter.name) => 
parameter.name
+    }
+    if (missingParamNames.nonEmpty) {
+      throw new RuntimeException(
+        s"Required parameters ${missingParamNames.mkString("[", ",", "]")} are 
missing.")
+    }
+    val argumentExpressions = new Array[Expression](parameters.size)
+    nameToArgumentMap.foreach {
+      case (name, argument) => argumentExpressions(nameToPositionMap(name)) = 
argument.expr
+    }
+    parameters.foreach {
+      case parameter if !parameter.isRequired && 
!nameToArgumentMap.contains(parameter.name) =>
+        argumentExpressions(nameToPositionMap(parameter.name)) =
+          Literal.create(null, parameter.dataType)
+      case _ =>
+    }
+    argumentExpressions.toSeq
+  }
+
+  private def buildNameToArgumentMap(
+      parameters: Seq[ProcedureParameter],
+      arguments: Seq[FlussCallArgument],
+      nameToPositionMap: Map[String, Int]): Map[String, FlussCallArgument] = {
+    val isNamedArgument = arguments.exists(_.isInstanceOf[FlussNamedArgument])
+    val isPositionalArgument = 
arguments.exists(_.isInstanceOf[FlussPositionalArgument])
+
+    if (isNamedArgument && isPositionalArgument) {
+      throw new RuntimeException("Cannot mix named and positional arguments.")
+    }
+
+    if (isNamedArgument) {
+      val namedArguments = arguments.asInstanceOf[Seq[FlussNamedArgument]]
+      val validationErrors = namedArguments.groupBy(_.name).collect {
+        case (name, procedureArguments) if procedureArguments.size > 1 =>
+          s"Procedure argument $name is duplicated."
+        case (name, _) if !nameToPositionMap.contains(name) => s"Argument 
$name is unknown."
+      }
+      if (validationErrors.nonEmpty) {
+        throw new RuntimeException(s"Invalid arguments: 
${validationErrors.mkString(", ")}")
+      }
+      namedArguments.map(arg => arg.name -> arg).toMap
+    } else {
+      if (arguments.size > parameters.size) {
+        throw new RuntimeException("Too many arguments for procedure")
+      }
+      arguments.zipWithIndex.map {
+        case (argument, position) =>
+          val param = parameters(position)
+          param.name -> argument
+      }.toMap
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala
new file mode 100644
index 000000000..f0ea0265d
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.catalyst.plans.logical
+
+import org.apache.fluss.spark.procedure.Procedure
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafCommand
+import org.apache.spark.sql.catalyst.util.truncatedString
+
+/** A CALL statement that needs to be resolved to a procedure. */
+case class FlussCallStatement(name: Seq[String], args: Seq[FlussCallArgument]) 
extends LeafCommand {
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+/** Base trait for CALL statement arguments. */
+sealed trait FlussCallArgument {
+  def expr: Expression
+}
+
+/** A positional argument in a stored procedure call. */
+case class FlussPositionalArgument(expr: Expression) extends FlussCallArgument
+
+/** A named argument in a stored procedure call. */
+case class FlussNamedArgument(name: String, expr: Expression) extends 
FlussCallArgument
+
+/** A CALL command that has been resolved to a specific procedure. */
+case class FlussCallCommand(procedure: Procedure, args: Seq[Expression]) 
extends LeafCommand {
+
+  override lazy val output: Seq[Attribute] =
+    procedure.outputType.map(
+      field => AttributeReference(field.name, field.dataType, field.nullable, 
field.metadata)())
+
+  override def simpleString(maxFields: Int): String = {
+    s"Call${truncatedString(output, "[", ", ", "]", maxFields)} 
${procedure.description}"
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/exception/NoSuchProcedureException.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/exception/NoSuchProcedureException.scala
new file mode 100644
index 000000000..c27adf422
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/exception/NoSuchProcedureException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.exception
+
+/**
+ * Exception for trying to execute a procedure that doesn't exist.
+ *
+ * @param message
+ *   the error message
+ * @param cause
+ *   the underlying cause (optional)
+ * @since 0.9
+ */
+class NoSuchProcedureException(message: String, cause: Throwable)
+  extends RuntimeException(message, cause) {
+
+  def this(message: String) = this(message, null)
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala
new file mode 100644
index 000000000..7af4ca3ec
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.execution
+
+import org.apache.fluss.spark.procedure.Procedure
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
GenericInternalRow}
+import org.apache.spark.sql.execution.LeafExecNode
+
+/**
+ * Physical plan node for executing a stored procedure.
+ *
+ * This implementation extends LeafExecNode for simpler command-style 
execution without requiring
+ * RDD-based distributed processing. This approach is more appropriate for 
procedures which are
+ * typically synchronous administrative operations.
+ *
+ * Benefits over SparkPlan-based implementation:
+ *   - No need to create and manage RDDs explicitly
+ *   - Simpler code without parallelize() overhead
+ *   - Better semantic match for command-style operations
+ *   - Consistent with other catalog implementations (e.g., Apache Paimon)
+ *
+ * @param output
+ *   the output attributes of the procedure result
+ * @param procedure
+ *   the procedure to execute
+ * @param args
+ *   the argument expressions to pass to the procedure
+ */
+case class CallProcedureExec(output: Seq[Attribute], procedure: Procedure, 
args: Seq[Expression])
+  extends LeafExecNode {
+
+  override def simpleString(maxFields: Int): String = {
+    s"CallProcedure ${procedure.description()}"
+  }
+
+  /**
+   * Execute the procedure and collect results efficiently.
+   *
+   * This is the primary execution method that directly returns results 
without unnecessary RDD
+   * creation overhead.
+   */
+  override def executeCollect(): Array[InternalRow] = {
+    // Evaluate all argument expressions
+    val argumentValues = new Array[Any](args.length)
+    args.zipWithIndex.foreach {
+      case (arg, index) =>
+        argumentValues(index) = arg.eval(null)
+    }
+
+    // Package arguments and invoke procedure
+    val argRow = new GenericInternalRow(argumentValues)
+    procedure.call(argRow)
+  }
+
+  override def executeTake(limit: Int): Array[InternalRow] = {
+    executeCollect().take(limit)
+  }
+
+  override def executeTail(limit: Int): Array[InternalRow] = {
+    val results = executeCollect()
+    results.takeRight(limit)
+  }
+
+  /**
+   * Execute the procedure and return results as an RDD.
+   *
+   * This method is required by the SparkPlan interface but simply wraps the 
executeCollect()
+   * results in an RDD for compatibility with the execution framework.
+   */
+  override protected def doExecute(): org.apache.spark.rdd.RDD[InternalRow] = {
+    sparkContext.parallelize(executeCollect(), 1)
+  }
+
+  override def withNewChildrenInternal(
+      newChildren: IndexedSeq[org.apache.spark.sql.execution.SparkPlan]): 
CallProcedureExec = {
+    copy()
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala
new file mode 100644
index 000000000..e4d3d4117
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.execution
+
+import org.apache.fluss.spark.catalyst.plans.logical.FlussCallCommand
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+
+/** Execution strategy for Fluss procedure calls. */
+case class FlussStrategy(spark: SparkSession) extends SparkStrategy {
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case call: FlussCallCommand =>
+      CallProcedureExec(call.output, call.procedure, call.args) :: Nil
+    case _ => Nil
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
new file mode 100644
index 000000000..f7f07b475
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.procedure
+
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.spark.SparkTable
+import org.apache.fluss.spark.catalog.{AbstractSparkTable, WithFlussAdmin}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
+
+/**
+ * Base class for Fluss stored procedures.
+ *
+ * This class provides common utility methods for procedure implementations, 
including identifier
+ * parsing, table loading, admin access, and result row construction.
+ *
+ * @param tableCatalog
+ *   the Spark catalog that owns this procedure
+ */
+abstract class BaseProcedure(tableCatalog: TableCatalog) extends Procedure {
+
+  /**
+   * Converts a string identifier to a Spark Identifier object.
+   *
+   * @param identifierAsString
+   *   the identifier string (e.g., "db.table" or "table")
+   * @param argName
+   *   the parameter name for error reporting
+   * @return
+   *   the Spark Identifier
+   */
+  protected def toIdentifier(identifierAsString: String, argName: String): 
Identifier = {
+    if (identifierAsString == null || identifierAsString.isEmpty) {
+      throw new IllegalArgumentException(s"Cannot handle an empty identifier 
for argument $argName")
+    }
+
+    val spark = SparkSession.active
+    val multipartIdentifier = identifierAsString.split("\\.")
+
+    if (multipartIdentifier.length == 1) {
+      val defaultNamespace = spark.sessionState.catalogManager.currentNamespace
+      Identifier.of(defaultNamespace, multipartIdentifier(0))
+    } else if (multipartIdentifier.length == 2) {
+      Identifier.of(Array(multipartIdentifier(0)), multipartIdentifier(1))
+    } else {
+      throw new IllegalArgumentException(
+        s"Invalid identifier format for argument $argName: 
$identifierAsString")
+    }
+  }
+
+  /**
+   * Loads a Spark table from the catalog.
+   *
+   * @param ident
+   *   the table identifier
+   * @return
+   *   the SparkTable instance
+   */
+  protected def loadSparkTable(ident: Identifier): SparkTable = {
+    try {
+      val table = tableCatalog.loadTable(ident)
+      table match {
+        case sparkTable: SparkTable => sparkTable
+        case _ =>
+          throw new IllegalArgumentException(
+            s"$ident is not a Fluss table: ${table.getClass.getName}")
+      }
+    } catch {
+      case e: Exception =>
+        val errMsg = s"Couldn't load table '$ident' in catalog 
'${tableCatalog.name()}'"
+        throw new RuntimeException(errMsg, e)
+    }
+  }
+
+  /**
+   * Gets the Fluss Admin client from the catalog.
+   *
+   * @return
+   *   the Admin instance
+   */
+  protected def getAdmin(): Admin = {
+    tableCatalog match {
+      case withAdmin: WithFlussAdmin => withAdmin.getAdmin
+      case _ =>
+        throw new IllegalStateException(
+          s"Catalog does not support Fluss admin: 
${tableCatalog.getClass.getName}")
+    }
+  }
+
+  /**
+   * Gets the Fluss Admin client from a SparkTable.
+   *
+   * @param table
+   *   the SparkTable instance
+   * @return
+   *   the Admin instance
+   */
+  protected def getAdmin(table: SparkTable): Admin = {
+    table match {
+      case abstractTable: AbstractSparkTable => abstractTable.admin
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Table is not an AbstractSparkTable: ${table.getClass.getName}")
+    }
+  }
+
+  /**
+   * Creates a new InternalRow with the given values.
+   *
+   * @param values
+   *   the values for the row
+   * @return
+   *   the InternalRow instance
+   */
+  protected def newInternalRow(values: Any*): InternalRow = {
+    new GenericInternalRow(values.toArray)
+  }
+
+  /**
+   * Converts a Spark Identifier to a Fluss TablePath.
+   *
+   * @param ident
+   *   the Spark identifier
+   * @return
+   *   the TablePath instance
+   */
+  protected def toTablePath(ident: Identifier): TablePath = {
+    if (ident.namespace().length != 1) {
+      throw new IllegalArgumentException("Only single namespace is supported")
+    }
+    TablePath.of(ident.namespace()(0), ident.name())
+  }
+}
+
+object BaseProcedure {
+
+  /**
+   * Abstract builder class for BaseProcedure implementations.
+   *
+   * @tparam T
+   *   the concrete procedure type to build
+   */
+  abstract class Builder[T <: BaseProcedure] extends ProcedureBuilder {
+    private var tableCatalog: TableCatalog = _
+
+    override def withTableCatalog(newTableCatalog: TableCatalog): Builder[T] = 
{
+      this.tableCatalog = newTableCatalog
+      this
+    }
+
+    override def build(): T = doBuild()
+
+    protected def doBuild(): T
+
+    protected def getTableCatalog: TableCatalog = tableCatalog
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
new file mode 100644
index 000000000..cdf3403da
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.procedure
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+
+/**
+ * Procedure to get cluster configuration(s).
+ *
+ * This procedure allows querying dynamic cluster configurations. It can 
retrieve:
+ *   - Specific configurations by key(s)
+ *   - All configurations (when no keys are provided)
+ *
+ * Usage examples:
+ * {{{
+ * -- Get a specific configuration
+ * CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec')
+ *
+ * -- Get all cluster configurations
+ * CALL sys.get_cluster_configs()
+ * }}}
+ */
+class GetClusterConfigsProcedure(tableCatalog: TableCatalog) extends 
BaseProcedure(tableCatalog) {
+
+  override def parameters(): Array[ProcedureParameter] = {
+    GetClusterConfigsProcedure.PARAMETERS
+  }
+
+  override def outputType(): StructType = {
+    GetClusterConfigsProcedure.OUTPUT_TYPE
+  }
+
+  override def call(args: InternalRow): Array[InternalRow] = {
+    val configKeys = if (args.numFields > 0 && !args.isNullAt(0)) {
+      val keysArray = args.getArray(0)
+      (0 until keysArray.numElements())
+        .map(i => keysArray.getUTF8String(i).toString)
+        .toArray
+    } else {
+      Array.empty[String]
+    }
+
+    getConfigs(configKeys)
+  }
+
+  private def getConfigs(configKeys: Array[String]): Array[InternalRow] = {
+    try {
+      val admin = getAdmin()
+      val configs = admin.describeClusterConfigs().get().asScala
+
+      if (configKeys.isEmpty) {
+        configs
+          .map(
+            entry =>
+              newInternalRow(
+                UTF8String.fromString(entry.key()),
+                UTF8String.fromString(entry.value()),
+                UTF8String.fromString(formatConfigSource(entry.source()))
+              ))
+          .toArray
+      } else {
+        val configEntryMap = configs.map(e => e.key() -> e).toMap
+        configKeys.flatMap {
+          key =>
+            configEntryMap.get(key).map {
+              entry =>
+                newInternalRow(
+                  UTF8String.fromString(entry.key()),
+                  UTF8String.fromString(entry.value()),
+                  UTF8String.fromString(formatConfigSource(entry.source()))
+                )
+            }
+        }
+      }
+    } catch {
+      case e: Exception =>
+        throw new RuntimeException(s"Failed to get cluster config: 
${e.getMessage}", e)
+    }
+  }
+
+  private def formatConfigSource(
+      source: org.apache.fluss.config.cluster.ConfigEntry.ConfigSource): 
String = {
+    if (source == null) {
+      "UNKNOWN"
+    } else {
+      source.name() match {
+        case "DYNAMIC_SERVER_CONFIG" => "DYNAMIC"
+        case "INITIAL_SERVER_CONFIG" => "STATIC"
+        case _ => source.name()
+      }
+    }
+  }
+
+  override def description(): String = {
+    "Retrieve cluster configuration values."
+  }
+}
+
+object GetClusterConfigsProcedure {
+
+  private val PARAMETERS: Array[ProcedureParameter] = Array(
+    ProcedureParameter.optional("config_keys", 
DataTypes.createArrayType(DataTypes.StringType))
+  )
+
+  private val OUTPUT_TYPE: StructType = new StructType(
+    Array(
+      new StructField("config_key", DataTypes.StringType, nullable = false, 
Metadata.empty),
+      new StructField("config_value", DataTypes.StringType, nullable = false, 
Metadata.empty),
+      new StructField("config_source", DataTypes.StringType, nullable = false, 
Metadata.empty)
+    )
+  )
+
+  def builder(): ProcedureBuilder = {
+    new BaseProcedure.Builder[GetClusterConfigsProcedure]() {
+      override protected def doBuild(): GetClusterConfigsProcedure = {
+        new GetClusterConfigsProcedure(getTableCatalog)
+      }
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala
new file mode 100644
index 000000000..81faef57f
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.procedure
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Interface for Fluss stored procedures that can be invoked via Spark SQL 
CALL statements.
+ *
+ * Procedures allow users to perform administrative and management operations 
through Spark SQL. All
+ * procedures are located in the `sys` namespace.
+ *
+ * Example usage:
+ * {{{
+ * CALL catalog.sys.procedure_name(parameter => 'value')
+ * }}}
+ */
+trait Procedure {
+
+  /**
+   * Returns the parameters accepted by this procedure.
+   *
+   * @return
+   *   array of procedure parameters
+   */
+  def parameters(): Array[ProcedureParameter]
+
+  /**
+   * Returns the output schema of this procedure.
+   *
+   * @return
+   *   the output StructType
+   */
+  def outputType(): StructType
+
+  /**
+   * Executes the procedure with the given arguments.
+   *
+   * @param args
+   *   the argument values as an InternalRow
+   * @return
+   *   array of result rows
+   */
+  def call(args: InternalRow): Array[InternalRow]
+
+  /**
+   * Returns a human-readable description of this procedure.
+   *
+   * @return
+   *   the procedure description
+   */
+  def description(): String = getClass.toString
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala
new file mode 100644
index 000000000..b017edf7c
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.procedure
+
+import org.apache.spark.sql.connector.catalog.TableCatalog
+
+/**
+ * Builder interface for creating Procedure instances.
+ *
+ * Procedure builders are used by the catalog to instantiate procedures with 
the necessary
+ * dependencies.
+ */
+trait ProcedureBuilder {
+
+  /**
+   * Sets the table catalog that the procedure will use.
+   *
+   * @param tableCatalog
+   *   the catalog instance
+   * @return
+   *   this builder for method chaining
+   */
+  def withTableCatalog(tableCatalog: TableCatalog): ProcedureBuilder
+
+  /**
+   * Builds and returns the procedure instance.
+   *
+   * @return
+   *   the constructed Procedure
+   */
+  def build(): Procedure
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala
new file mode 100644
index 000000000..0c48e7631
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.procedure
+
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Represents a parameter for a Fluss stored procedure.
+ *
+ * @param name
+ *   the parameter name
+ * @param dataType
+ *   the Spark SQL data type of the parameter
+ * @param isRequired
+ *   whether this parameter is required
+ */
+case class ProcedureParameter(name: String, dataType: DataType, isRequired: 
Boolean = false)
+
+object ProcedureParameter {
+
+  /**
+   * Creates a required procedure parameter.
+   *
+   * @param name
+   *   the parameter name
+   * @param dataType
+   *   the parameter data type
+   * @return
+   *   a required ProcedureParameter
+   */
+  def required(name: String, dataType: DataType): ProcedureParameter =
+    ProcedureParameter(name, dataType, isRequired = true)
+
+  /**
+   * Creates an optional procedure parameter.
+   *
+   * @param name
+   *   the parameter name
+   * @param dataType
+   *   the parameter data type
+   * @return
+   *   an optional ProcedureParameter
+   */
+  def optional(name: String, dataType: DataType): ProcedureParameter =
+    ProcedureParameter(name, dataType, isRequired = false)
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
new file mode 100644
index 000000000..2080d4da1
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.atn.PredictionMode
+import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+/**
+ * Parser extension for Fluss SQL extensions.
+ *
+ * @param delegate
+ *   The main Spark SQL parser.
+ */
+class FlussSparkSqlParser(delegate: ParserInterface) extends ParserInterface {
+
+  private lazy val astBuilder = new FlussSqlAstBuilder(delegate)
+
+  override def parsePlan(sqlText: String): LogicalPlan = {
+    try {
+      parse(sqlText)(parser => 
astBuilder.visitSingleStatement(parser.singleStatement()))
+    } catch {
+      case _: ParseException | _: ParseCancellationException =>
+        delegate.parsePlan(sqlText)
+    }
+  }
+
+  override def parseQuery(sqlText: String): LogicalPlan = parsePlan(sqlText)
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+
+  private def parse[T](sqlText: String)(
+      toResult: org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser 
=> T): T = {
+    val lexer = new FlussSparkSqlParserLexer(
+      new UpperCaseCharStream(CharStreams.fromString(sqlText)))
+    lexer.removeErrorListeners()
+    lexer.addErrorListener(FlussParseErrorListener)
+
+    val tokenStream = new CommonTokenStream(lexer)
+    val parser =
+      new 
org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser(tokenStream)
+    parser.removeErrorListeners()
+    parser.addErrorListener(FlussParseErrorListener)
+
+    try {
+      try {
+        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
+        toResult(parser)
+      } catch {
+        case _: ParseCancellationException =>
+          tokenStream.seek(0)
+          parser.reset()
+          parser.getInterpreter.setPredictionMode(PredictionMode.LL)
+          toResult(parser)
+      }
+    } catch {
+      case e: ParseException if e.command.isDefined =>
+        throw e
+      case e: ParseException =>
+        throw e.withCommand(sqlText)
+      case e: AnalysisException =>
+        val position = org.apache.spark.sql.catalyst.trees.Origin(e.line, 
e.startPosition)
+        throw new ParseException(Option(sqlText), e.message, position, 
position)
+    }
+  }
+}
+
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+  override def consume(): Unit = wrapped.consume()
+  override def getSourceName: String = wrapped.getSourceName
+  override def index(): Int = wrapped.index()
+  override def mark(): Int = wrapped.mark()
+  override def release(marker: Int): Unit = wrapped.release(marker)
+  override def seek(where: Int): Unit = wrapped.seek(where)
+  override def size(): Int = wrapped.size()
+
+  override def getText(interval: Interval): String = {
+    wrapped.getText(interval)
+  }
+
+  override def LA(i: Int): Int = {
+    val la = wrapped.LA(i)
+    if (la == 0 || la == IntStream.EOF) la
+    else Character.toUpperCase(la)
+  }
+}
+
+object FlussParseErrorListener extends BaseErrorListener {
+  override def syntaxError(
+      recognizer: Recognizer[_, _],
+      offendingSymbol: scala.Any,
+      line: Int,
+      charPositionInLine: Int,
+      msg: String,
+      e: RecognitionException): Unit = {
+    val position = org.apache.spark.sql.catalyst.trees.Origin(Some(line), 
Some(charPositionInLine))
+    throw new ParseException(None, msg, position, position)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
new file mode 100644
index 000000000..053c23ea0
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser
+
+import org.apache.fluss.spark.catalyst.plans.logical._
+
+import org.antlr.v4.runtime._
+import org.antlr.v4.runtime.misc.Interval
+import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser._
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical._
+
+import scala.collection.JavaConverters._
+
+/**
+ * The AST Builder for Fluss SQL extensions.
+ *
+ * @param delegate
+ *   The main Spark SQL parser.
+ */
+class FlussSqlAstBuilder(delegate: ParserInterface)
+  extends FlussSparkSqlParserBaseVisitor[AnyRef]
+  with Logging {
+
+  /** Creates a single statement of extension statements. */
+  override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan 
= withOrigin(ctx) {
+    visit(ctx.statement).asInstanceOf[LogicalPlan]
+  }
+
+  /** Creates a [[FlussCallStatement]] for a stored procedure call. */
+  override def visitCall(ctx: CallContext): FlussCallStatement = 
withOrigin(ctx) {
+    val name =
+      ctx.multipartIdentifier.parts.asScala.map(part => 
cleanIdentifier(part.getText)).toSeq
+    val args = 
ctx.callArgument.asScala.map(typedVisit[FlussCallArgument]).toSeq
+    FlussCallStatement(name, args)
+  }
+
+  /** Creates a positional argument in a stored procedure call. */
+  override def visitPositionalArgument(ctx: PositionalArgumentContext): 
FlussCallArgument =
+    withOrigin(ctx) {
+      val expression = typedVisit[Expression](ctx.expression)
+      FlussPositionalArgument(expression)
+    }
+
+  /** Creates a named argument in a stored procedure call. */
+  override def visitNamedArgument(ctx: NamedArgumentContext): 
FlussCallArgument = withOrigin(ctx) {
+    val name = cleanIdentifier(ctx.identifier.getText)
+    val expression = typedVisit[Expression](ctx.expression)
+    FlussNamedArgument(name, expression)
+  }
+
+  /** Creates a [[Expression]] in a positional and named argument. */
+  override def visitExpression(ctx: ExpressionContext): Expression = {
+    val sqlString = reconstructSqlString(ctx)
+    delegate.parseExpression(sqlString)
+  }
+
+  /** Returns a multi-part identifier as Seq[String]. */
+  override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): 
Seq[String] =
+    withOrigin(ctx) {
+      ctx.parts.asScala.map(part => cleanIdentifier(part.getText)).toSeq
+    }
+
+  /** Remove backticks from identifier. */
+  private def cleanIdentifier(ident: String): String = {
+    if (ident.startsWith("`") && ident.endsWith("`")) {
+      ident.substring(1, ident.length - 1)
+    } else {
+      ident
+    }
+  }
+
+  private def reconstructSqlString(ctx: ParserRuleContext): String = {
+    ctx.children.asScala
+      .map {
+        case c: ParserRuleContext => reconstructSqlString(c)
+        case t: TerminalNode => t.getText
+      }
+      .mkString(" ")
+  }
+
+  private def typedVisit[T](ctx: ParseTree): T =
+    ctx.accept(this).asInstanceOf[T]
+
+  private def withOrigin[T](ctx: ParserRuleContext)(f: => T): T = {
+    val current = CurrentOrigin.get
+    CurrentOrigin.set(position(ctx.getStart))
+    try {
+      f
+    } finally {
+      CurrentOrigin.set(current)
+    }
+  }
+
+  private def position(token: Token): Origin = {
+    val opt = Option(token)
+    Origin(opt.map(_.getLine), opt.map(_.getCharPositionInLine))
+  }
+}
+
+case class Origin(
+    line: Option[Int] = None,
+    startPosition: Option[Int] = None,
+    startIndex: Option[Int] = None,
+    stopIndex: Option[Int] = None,
+    sqlText: Option[String] = None,
+    objectType: Option[String] = None,
+    objectName: Option[String] = None)
+
+object CurrentOrigin {
+  private val value = new ThreadLocal[Origin]() {
+    override def initialValue: Origin = Origin()
+  }
+
+  def get: Origin = value.get()
+  def set(o: Origin): Unit = value.set(o)
+  def reset(): Unit = value.set(Origin())
+
+  def withOrigin[A](o: Origin)(f: => A): A = {
+    val previous = get
+    set(o)
+    val ret =
+      try f
+      finally { set(previous) }
+    ret
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 688ae872c..11287fb7d 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -53,6 +53,7 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
       .set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
       .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", 
flussServer.getBootstrapServers)
       .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+      .set("spark.sql.extensions", 
classOf[FlussSparkSessionExtensions].getName)
   }
 
   override protected def beforeAll(): Unit = {
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala
new file mode 100644
index 000000000..36024fd65
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.extensions
+
+import org.apache.fluss.spark.FlussSparkSessionExtensions
+import org.apache.fluss.spark.catalyst.plans.logical.{FlussCallArgument, 
FlussCallStatement, FlussNamedArgument, FlussPositionalArgument}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.types.DataTypes
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+
+import java.math.BigDecimal
+import java.sql.Timestamp
+import java.time.Instant
+
+class CallStatementParserTest extends FunSuite with BeforeAndAfterEach {
+
+  private var spark: SparkSession = _
+  private var parser: ParserInterface = _
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    val optionalSession = 
SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession)
+    optionalSession.foreach(_.stop())
+    SparkSession.clearActiveSession()
+
+    spark = SparkSession
+      .builder()
+      .master("local[2]")
+      .config("spark.sql.extensions", 
classOf[FlussSparkSessionExtensions].getName)
+      .getOrCreate()
+
+    parser = spark.sessionState.sqlParser
+  }
+
+  override def afterEach(): Unit = {
+    if (spark != null) {
+      spark.stop()
+      spark = null
+      parser = null
+    }
+    super.afterEach()
+  }
+
+  test("testCallWithBackticks") {
+    val call =
+      parser.parsePlan("CALL 
cat.`system`.`no_args_func`()").asInstanceOf[FlussCallStatement]
+    assert(call.name.toList == List("cat", "system", "no_args_func"))
+    assert(call.args.size == 0)
+  }
+
+  test("testCallWithNamedArguments") {
+    val callStatement = parser
+      .parsePlan("CALL catalog.system.named_args_func(arg1 => 1, arg2 => 
'test', arg3 => true)")
+      .asInstanceOf[FlussCallStatement]
+
+    assert(callStatement.name.toList == List("catalog", "system", 
"named_args_func"))
+    assert(callStatement.args.size == 3)
+    assertArgument(callStatement, 0, Some("arg1"), 1, DataTypes.IntegerType)
+    assertArgument(callStatement, 1, Some("arg2"), "test", 
DataTypes.StringType)
+    assertArgument(callStatement, 2, Some("arg3"), true, DataTypes.BooleanType)
+  }
+
+  test("testCallWithPositionalArguments") {
+    val callStatement = parser
+      .parsePlan(
+        "CALL catalog.system.positional_args_func(1, 
'${spark.sql.extensions}', 2L, true, 3.0D, 4.0e1, 500e-1BD, TIMESTAMP 
'2017-02-03T10:37:30.00Z')")
+      .asInstanceOf[FlussCallStatement]
+
+    assert(callStatement.name.toList == List("catalog", "system", 
"positional_args_func"))
+    assert(callStatement.args.size == 8)
+    assertArgument(callStatement, 0, None, 1, DataTypes.IntegerType)
+    assertArgument(
+      callStatement,
+      1,
+      None,
+      classOf[FlussSparkSessionExtensions].getName,
+      DataTypes.StringType)
+    assertArgument(callStatement, 2, None, 2L, DataTypes.LongType)
+    assertArgument(callStatement, 3, None, true, DataTypes.BooleanType)
+    assertArgument(callStatement, 4, None, 3.0, DataTypes.DoubleType)
+    assertArgument(callStatement, 5, None, 4.0e1, DataTypes.DoubleType)
+    assertArgument(
+      callStatement,
+      6,
+      None,
+      new BigDecimal("500e-1"),
+      DataTypes.createDecimalType(3, 1))
+    assertArgument(
+      callStatement,
+      7,
+      None,
+      Timestamp.from(Instant.parse("2017-02-03T10:37:30.00Z")),
+      DataTypes.TimestampType)
+  }
+
+  test("testCallWithMixedArguments") {
+    val callStatement = parser
+      .parsePlan("CALL catalog.system.mixed_args_func(arg1 => 1, 'test')")
+      .asInstanceOf[FlussCallStatement]
+
+    assert(callStatement.name.toList == List("catalog", "system", 
"mixed_args_func"))
+    assert(callStatement.args.size == 2)
+    assertArgument(callStatement, 0, Some("arg1"), 1, DataTypes.IntegerType)
+    assertArgument(callStatement, 1, None, "test", DataTypes.StringType)
+  }
+
+  test("testCallSimpleProcedure") {
+    val callStatement = parser
+      .parsePlan("CALL system.simple_procedure(table => 'db.table')")
+      .asInstanceOf[FlussCallStatement]
+
+    assert(callStatement.name.toList == List("system", "simple_procedure"))
+    assert(callStatement.args.size == 1)
+    assertArgument(callStatement, 0, Some("table"), "db.table", 
DataTypes.StringType)
+  }
+
+  private def assertArgument(
+      callStatement: FlussCallStatement,
+      index: Int,
+      expectedName: Option[String],
+      expectedValue: Any,
+      expectedType: org.apache.spark.sql.types.DataType): Unit = {
+
+    val callArgument = callStatement.args(index)
+
+    expectedName match {
+      case None =>
+        assert(callArgument.isInstanceOf[FlussPositionalArgument])
+      case Some(name) =>
+        val namedArgument = callArgument.asInstanceOf[FlussNamedArgument]
+        assert(namedArgument.name == name)
+    }
+
+    assert(callStatement.args(index).expr == Literal.create(expectedValue, 
expectedType))
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
new file mode 100644
index 000000000..a2b0c4396
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.procedure
+
+import org.apache.fluss.config.ConfigOptions
+import org.apache.fluss.config.cluster.{AlterConfig, AlterConfigOpType}
+import org.apache.fluss.spark.FlussSparkTestBase
+
+import scala.collection.JavaConverters._
+
+class GetClusterConfigsProcedureTest extends FlussSparkTestBase {
+
+  test("get_cluster_configs: get all configurations") {
+    val result = sql(s"CALL 
$DEFAULT_CATALOG.sys.get_cluster_configs()").collect()
+
+    assert(result.length > 0)
+
+    val firstRow = result.head
+    assert(firstRow.length == 3)
+    assert(
+      firstRow.schema.fieldNames.sameElements(Array("config_key", 
"config_value", "config_source")))
+
+    result.foreach {
+      row =>
+        assert(row.getString(0) != null)
+        assert(row.getString(1) != null)
+        assert(row.getString(2) != null)
+    }
+  }
+
+  test("get_cluster_configs: get specific configuration") {
+    val testKey = ConfigOptions.KV_SNAPSHOT_INTERVAL.key()
+
+    val result = sql(
+      s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$testKey'))").collect()
+
+    assert(result.length == 1)
+    val row = result.head
+    assert(row.getString(0) == testKey)
+    assert(row.getString(1) != null)
+    assert(row.getString(2) != null)
+  }
+
+  test("get_cluster_configs: get multiple configurations") {
+    val key1 = ConfigOptions.KV_SNAPSHOT_INTERVAL.key()
+    val key2 = ConfigOptions.REMOTE_DATA_DIR.key()
+
+    val result =
+      sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$key1', '$key2'))")
+        .collect()
+
+    assert(result.length == 2)
+
+    val keys = result.map(_.getString(0)).toSet
+    assert(keys.contains(key1))
+    assert(keys.contains(key2))
+
+    result.foreach {
+      row =>
+        assert(row.getString(1) != null)
+        assert(row.getString(2) != null)
+    }
+  }
+
+  test("get_cluster_configs: get non-existent configuration") {
+    val nonExistentKey = "non.existent.config.key"
+
+    val result =
+      sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$nonExistentKey'))")
+        .collect()
+
+    assert(result.length == 0)
+  }
+
+  test("get_cluster_configs: mixed existent and non-existent configurations") {
+    val existentKey = ConfigOptions.KV_SNAPSHOT_INTERVAL.key()
+    val nonExistentKey = "non.existent.config.key"
+
+    val result = sql(
+      s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$existentKey', '$nonExistentKey'))")
+      .collect()
+
+    assert(result.length == 1)
+    assert(result.head.getString(0) == existentKey)
+  }
+
+  test("get_cluster_configs: verify configuration source") {
+    val testKey = ConfigOptions.KV_SNAPSHOT_INTERVAL.key()
+
+    val result = sql(
+      s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$testKey'))").collect()
+
+    assert(result.length == 1)
+    val row = result.head
+    val source = row.getString(2)
+
+    assert(source == "DYNAMIC" || source == "STATIC" || source == "DEFAULT")
+  }
+
+  test("get_cluster_configs: empty array parameter should return all configs") 
{
+    val result =
+      sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array())").collect()
+
+    assert(result.length > 0)
+  }
+}
diff --git a/website/blog/2024-11-29-fluss-open-source.md 
b/website/blog/2024-11-29-fluss-open-source.md
index 3e3effb41..42e3df116 100644
--- a/website/blog/2024-11-29-fluss-open-source.md
+++ b/website/blog/2024-11-29-fluss-open-source.md
@@ -40,7 +40,7 @@ Make sure to keep an eye on the project, give it a try and if 
you like it, don
 
 ### Getting Started
 - Visit the [GitHub repository](https://github.com/apache/fluss).
-- Check out the [quickstart guide](/docs/quickstart/flink/).
+- Check out the [quickstart guide](/docs/next/quickstart/flink/).
 
 ### Additional Resources
 - Announcement Blog Post: [Introducing Fluss: Unified Streaming Storage For 
Next-Generation Data 
Analytics](https://www.ververica.com/blog/introducing-fluss)
diff --git a/website/blog/2025-06-01-partial-updates.md 
b/website/blog/2025-06-01-partial-updates.md
index 55317235e..29d60b8c8 100644
--- a/website/blog/2025-06-01-partial-updates.md
+++ b/website/blog/2025-06-01-partial-updates.md
@@ -265,7 +265,7 @@ Flink SQL> SELECT * FROM user_rec_wide;
 
 Now let's switch to `batch` mode and query the current snapshot of the 
`user_rec_wide` table.
 
-But before that, let's start the [Tiering 
Service](/docs/maintenance/tiered-storage/lakehouse-storage/#start-the-datalake-tiering-service)
 that allows offloading the tables as `Lakehouse` tables.
+But before that, let's start the [Tiering 
Service](/docs/next/maintenance/tiered-storage/lakehouse-storage/#start-the-datalake-tiering-service)
 that allows offloading the tables as `Lakehouse` tables.
 
 **Step 7:** Open a new terminal đź’» in the `Coordinator Server` and run the 
following command to start the `Tiering Service`:
 ```shell
@@ -297,7 +297,7 @@ Flink SQL> SELECT * FROM user_rec_wide;
 ### Conclusion
 Partial updates in Fluss enable an alternative approach in how we design 
streaming data pipelines for enriching or joining data. 
 
-When all your sources share a primary key - otherwise you can mix & match 
[streaming lookup joins](/docs/engine-flink/lookups/#lookup) - you can turn the 
problem on its head: update a unified table incrementally, rather than joining 
streams on the fly.
+When all your sources share a primary key - otherwise you can mix & match 
[streaming lookup joins](/docs/next/engine-flink/lookups/#lookup) - you can 
turn the problem on its head: update a unified table incrementally, rather than 
joining streams on the fly.
 
 The result is a more scalable, maintainable, and efficient pipeline. 
 Engineers can spend less time wrestling with Flink’s state, checkpoints and 
join mechanics, and more time delivering fresh, integrated data to power 
real-time analytics and applications. 
diff --git a/website/blog/releases/0.6.md b/website/blog/releases/0.6.md
index 287b5cb37..46b1a5624 100644
--- a/website/blog/releases/0.6.md
+++ b/website/blog/releases/0.6.md
@@ -184,7 +184,7 @@ SELECT * FROM fluss_left_table INNER JOIN fluss_right_table
 Flink performs lookups on Fluss tables using the Join Key, which serves as the 
Bucket Key for the Fluss table.
 This allows it to leverage the prefix index of the primary key in the Fluss 
table, enabling highly efficient lookup queries.
 This feature in Fluss is referred to as Prefix Lookup. Currently, Prefix 
Lookup can also be used to perform one-to-many lookup queries.
-For more details, please refer to the [Prefix 
Lookup](/docs/engine-flink/lookups/#prefix-lookup) documentation.
+For more details, please refer to the [Prefix 
Lookup](/docs/next/engine-flink/lookups/#prefix-lookup) documentation.
 
 ## Stability & Performance Improvements
 
diff --git a/website/blog/releases/0.7.md b/website/blog/releases/0.7.md
index a8ebf1bc0..74b3ba169 100644
--- a/website/blog/releases/0.7.md
+++ b/website/blog/releases/0.7.md
@@ -79,7 +79,7 @@ flink run /path/to/fluss-flink-tiering-0.7.0.jar \
     --datalake.paimon.warehouse /path/to/warehouse
 ```
 
-See more details in the [Streaming Lakehouse 
documentation](/docs/maintenance/tiered-storage/lakehouse-storage/).
+See more details in the [Streaming Lakehouse 
documentation](/docs/next/maintenance/tiered-storage/lakehouse-storage/).
 
 ## Streaming Partition Pruning
 Partitioning is a foundational technique in modern data warehouses and 
Lakehouse architectures for optimizing query performance by 
@@ -131,7 +131,7 @@ CALL admin_catalog.sys.add_acl(
 );
 ```
 
-For details, please refer to the [Security 
documentation](/docs/security/overview/) and quickstarts.
+For details, please refer to the [Security 
documentation](/docs/next/security/overview/) and quickstarts.
 
 ## Flink DataStream Connector
 Fluss 0.7 officially introduces the DataStream Connector, supporting both 
Source and Sink for reading and writing log and primary key tables. Users can 
now seamlessly integrate Fluss tables into Flink DataStream pipelines.
@@ -155,7 +155,7 @@ DataStreamSource<Order> stream = env.fromSource(
 );
 ```
 
-For usage examples and configuration parameters, see the [DataStream Connector 
documentation](/docs/engine-flink/datastream/).
+For usage examples and configuration parameters, see the [DataStream Connector 
documentation](/docs/next/engine-flink/datastream/).
 
 
 ## Fluss Java Client
@@ -164,7 +164,7 @@ In this version, we officially release the Fluss Java 
Client, a client library d
 * **Table API:** For table-based data operations, supporting streaming 
reads/writes, updates, deletions, and point queries.
 * **Admin API:** For metadata management, including cluster management, table 
lifecycle, and access control.
 
-The client supports forward and backward compatibility, ensuring smooth 
upgrades across Fluss versions. With the Fluss Java Client, developers can 
build online applications and data ingestion services based on Fluss, as well 
as enterprise-level components such as Fluss management platforms and 
operations monitoring systems. For detailed usage instructions, please refer to 
the official documentation: [Fluss Java Client User 
Guide](/docs/apis/java-client/).
+The client supports forward and backward compatibility, ensuring smooth 
upgrades across Fluss versions. With the Fluss Java Client, developers can 
build online applications and data ingestion services based on Fluss, as well 
as enterprise-level components such as Fluss management platforms and 
operations monitoring systems. For detailed usage instructions, please refer to 
the official documentation: [Fluss Java Client User 
Guide](/docs/next/apis/java-client/).
 
 Fluss uses Apache Arrow as its underlying storage format, enabling efficient 
cross-language extensions. A **Fluss Python Client** is planned for future 
releases, leveraging the rich ecosystem of **PyArrow** to integrate with 
popular data analysis tools such as **Pandas** and **DuckDB**. 
 This will further lower the barrier for real-time data exploration and 
analytics.
diff --git a/website/blog/releases/0.8.md b/website/blog/releases/0.8.md
index c302685af..963ab6749 100644
--- a/website/blog/releases/0.8.md
+++ b/website/blog/releases/0.8.md
@@ -51,7 +51,7 @@ datalake.iceberg.type: hadoop
 datalake.iceberg.warehouse: /path/to/iceberg
 ```
 
-You can find more detailed instructions in the [Iceberg Lakehouse 
documentation](/docs/streaming-lakehouse/integrate-data-lakes/iceberg/).
+You can find more detailed instructions in the [Iceberg Lakehouse 
documentation](/docs/next/streaming-lakehouse/integrate-data-lakes/iceberg/).
 
 ## Real-Time Multimodal AI Analytics with Lance
 
@@ -80,7 +80,7 @@ datalake.lance.access_key_id: <access_key_id>
 datalake.lance.secret_access_key: <secret_access_key>
 ```
 
-See the [LanceDB blog post](https://lancedb.com/blog/fluss-integration/) for 
the full integration. You also can find more detailed instructions in the 
[Lance Lakehouse 
documentation](/docs/streaming-lakehouse/integrate-data-lakes/lance/).
+See the [LanceDB blog post](https://lancedb.com/blog/fluss-integration/) for 
the full integration. You also can find more detailed instructions in the 
[Lance Lakehouse 
documentation](/docs/next/streaming-lakehouse/integrate-data-lakes/lance/).
 
 ## Flink 2.1
 
@@ -102,7 +102,7 @@ Below is a performance comparison (CPU, memory, state size, 
checkpoint interval)
 ![](../assets/taobao_practice/performance_delta2.png)
 
 
-You can find more detailed instructions in the [Delta Join 
documentation](/docs/engine-flink/delta-joins/).
+You can find more detailed instructions in the [Delta Join 
documentation](/docs/next/engine-flink/delta-joins/).
 
 ### Materialized Table
 
@@ -135,7 +135,7 @@ WITH(
 );
 ```
 
-You can find more detailed instructions in the [Materialized Table 
documentation](/docs/engine-flink/ddl/#materialized-table).
+You can find more detailed instructions in the [Materialized Table 
documentation](/docs/next/engine-flink/ddl/#materialized-table).
 
 ## Stability
 
@@ -144,7 +144,7 @@ Through continuous validation across multiple business 
units within Alibaba Grou
 These improvements substantially enhance Fluss’s robustness in 
mission-critical streaming use cases.
 
 Key improvements include:
-- **[Graceful Shutdown](/docs/maintenance/operations/graceful-shutdown/)**: 
Fluss supports cluster rolling upgrade, and we introduced a graceful shutdown 
mechanism for TabletServers in this version. During shutdown, leadership is 
proactively migrated before termination, ensuring that read/write latency 
remains unaffected during rolling upgrades.
+- **[Graceful 
Shutdown](/docs/next/maintenance/operations/graceful-shutdown/)**: Fluss 
supports cluster rolling upgrade, and we introduced a graceful shutdown 
mechanism for TabletServers in this version. During shutdown, leadership is 
proactively migrated before termination, ensuring that read/write latency 
remains unaffected during rolling upgrades.
 - **Accelerated Coordinator Event Processing**: Optimized the Coordinator’s 
event handling mechanism through asynchronous processing and batched ZooKeeper 
operations. As a result, all events are now processed in milliseconds.
 - **Faster Coordinator Recovery**: Parallelized initialization cuts 
Coordinator startup time from 10 minutes to just 20 seconds in production-scale 
benchmarks, this dramatically improves service availability and recovery speed.
 - **Optimized Server Metrics**: Refined metric granularity and reporting logic 
to reduce telemetry volume by 90% while preserving full observability.
@@ -180,7 +180,7 @@ When you issue a `ALTER TABLE ... SET` command to update 
storage options on a ta
 
 This capability is especially useful for tuning performance, adapting to 
changing data patterns, or complying with evolving data governance 
requirements—all without service interruption.
 
-You can find more detailed instructions in the [Updating Configs 
documentation](/docs/maintenance/operations/updating-configs/).
+You can find more detailed instructions in the [Updating Configs 
documentation](/docs/next/maintenance/operations/updating-configs/).
 
 ## Helm Charts
 
@@ -188,7 +188,7 @@ This release also introduced Helm Charts. With this 
addition, users can now depl
 The Helm chart simplifies provisioning, upgrades, and scaling by packaging 
configuration, manifests, and dependencies into a single, versioned release.
 This should help users running Fluss on Kubernetes faster, more reliably, and 
with easier integration into existing CI/CD and observability setups, 
significantly lowering the barrier for teams adopting Fluss in production.
 
-You can find more detailed instructions in the [Deploying with Helm 
documentation](/docs/install-deploy/deploying-with-helm/).
+You can find more detailed instructions in the [Deploying with Helm 
documentation](/docs/next/install-deploy/deploying-with-helm/).
 
 ## Java Version Upgrade
 
@@ -214,7 +214,7 @@ The Fluss community is committed to delivering a smooth 
upgrade experience. This
 - Clients from version 0.7 can seamlessly connect to version 0.8 servers,
 - Clients from version 0.8 are also compatible with version 0.7 servers.
 
-However, Fluss 0.8 is the first official release since the project entered the 
Apache Incubator, and it includes changes such as package path updates (e.g., 
groupId and Java package names). As a result, applications that depend on the 
Fluss SDK will need to make corresponding code adjustments when upgrading to 
version 0.8. Please refer to the [upgrade 
notes](/docs/maintenance/operations/upgrade-notes-0.8/) for a comprehensive 
list of adjustments to make and issues to check during the upg [...]
+However, Fluss 0.8 is the first official release since the project entered the 
Apache Incubator, and it includes changes such as package path updates (e.g., 
groupId and Java package names). As a result, applications that depend on the 
Fluss SDK will need to make corresponding code adjustments when upgrading to 
version 0.8. Please refer to the [upgrade 
notes](/docs/next/maintenance/operations/upgrade-notes-0.8/) for a 
comprehensive list of adjustments to make and issues to check during th [...]
 
 For a detailed list of all changes in this release, please refer to the 
[release notes](https://github.com/apache/fluss/releases/tag/v0.8.0-incubating).
 
diff --git a/website/docs/engine-spark/_category_.json 
b/website/docs/engine-spark/_category_.json
new file mode 100644
index 000000000..e81d6a72f
--- /dev/null
+++ b/website/docs/engine-spark/_category_.json
@@ -0,0 +1,4 @@
+{
+  "label": "Engine Spark",
+  "position": 7
+}
diff --git a/website/docs/engine-spark/procedures.md 
b/website/docs/engine-spark/procedures.md
new file mode 100644
index 000000000..0d8b01047
--- /dev/null
+++ b/website/docs/engine-spark/procedures.md
@@ -0,0 +1,107 @@
+---
+sidebar_label: Procedures
+title: Procedures
+sidebar_position: 3
+---
+
+# Procedures
+
+Fluss provides stored procedures to perform administrative and management 
operations through Spark SQL. All procedures are located in the `sys` namespace 
and can be invoked using the `CALL` statement.
+
+## Configuration
+
+To enable Fluss procedures in Spark, you need to configure the Spark session 
extensions:
+
+```scala
+spark.conf.set("spark.sql.extensions", 
"org.apache.fluss.spark.FlussSparkSessionExtensions")
+```
+
+Or in `spark-defaults.conf`:
+
+```properties
+spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
+```
+
+## Syntax
+
+The general syntax for calling a procedure is:
+
+```sql
+CALL [catalog_name.]sys.procedure_name(
+  parameter_name => 'value',
+  another_parameter => 'value'
+)
+```
+
+### Argument Passing
+
+Procedures support two ways to pass arguments:
+
+1. **Named Arguments** (recommended):
+   ```sql
+   CALL catalog.sys.procedure_name(parameter => 'value')
+   ```
+
+2. **Positional Arguments**:
+   ```sql
+   CALL catalog.sys.procedure_name('value')
+   ```
+
+Note: You cannot mix named and positional arguments in a single procedure call.
+
+## Cluster Configuration Procedures
+
+Fluss provides procedures to dynamically manage cluster configurations without 
requiring a server restart.
+
+### get_cluster_configs
+
+Retrieve cluster configuration values.
+
+**Syntax:**
+
+```sql
+CALL [catalog_name.]sys.get_cluster_configs()
+CALL [catalog_name.]sys.get_cluster_configs(config_keys => ARRAY('key1', 
'key2'))
+```
+
+**Parameters:**
+
+- `config_keys` (optional): Array of configuration keys to retrieve. If 
omitted, returns all cluster configurations.
+
+**Returns:** A table with columns:
+
+- `config_key`: The configuration key name
+- `config_value`: The current value
+- `config_source`: The source of the configuration (e.g., `DEFAULT`, 
`DYNAMIC`, `STATIC`)
+
+**Example:**
+
+```sql title="Spark SQL"
+-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if 
different)
+USE fluss_catalog;
+
+-- Get all cluster configurations
+CALL sys.get_cluster_configs();
+
+-- Get specific configurations
+CALL sys.get_cluster_configs(config_keys => 
ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format'));
+```
+
+## Error Handling
+
+Procedures will throw exceptions in the following cases:
+
+- **Missing Required Parameters**: If a required parameter is not provided
+- **Invalid Procedure Name**: If the specified procedure does not exist
+- **Type Mismatch**: If a parameter value cannot be converted to the expected 
type
+- **Permission Denied**: If the user does not have permission to perform the 
operation
+
+## Implementation Notes
+
+- Procedures are executed synchronously and return results immediately
+- The `sys` namespace is reserved for system procedures
+- Custom procedures can be added by implementing the `Procedure` interface
+
+## See Also
+
+- [Flink Procedures](../../engine-flink/procedures)
diff --git a/website/docs/maintenance/operations/rebalance.md 
b/website/docs/maintenance/operations/rebalance.md
index 48d89ac40..eafb19827 100644
--- a/website/docs/maintenance/operations/rebalance.md
+++ b/website/docs/maintenance/operations/rebalance.md
@@ -210,7 +210,7 @@ public class RebalanceExample {
 
 ## Using Flink Stored Procedures
 
-For rebalancing operations, Fluss provides convenient Flink stored procedures 
that can be called directly from Flink SQL. See [Rebalance 
Procedures](/docs/engine-flink/procedures.md#rebalance-procedures) for detailed 
documentation on using the following procedures:
+For rebalancing operations, Fluss provides convenient Flink stored procedures 
that can be called directly from Flink SQL. See [Rebalance 
Procedures](../../../engine-flink/procedures#rebalance-procedures) for detailed 
documentation on using the following procedures:
 
 - **add_server_tag**: Tag servers before rebalancing
 - **remove_server_tag**: Remove tags after rebalancing
diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md 
b/website/docs/maintenance/tiered-storage/lakehouse-storage.md
index 35b2394d7..89237c31c 100644
--- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md
+++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md
@@ -51,7 +51,7 @@ For example:
 - If you are using Paimon filesystem catalog with OSS filesystem, you need to 
put `paimon-oss-<paimon_version>.jar` into directory 
`${FLUSS_HOME}/plugins/paimon/`.
 - If you are using Paimon Hive catalog, you need to put [the flink sql hive 
connector 
jar](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar)
 into directory `${FLUSS_HOME}/plugins/paimon/`.
 
-Additionally, when using Paimon with HDFS, you must also configure the Fluss 
server with the Hadoop environment. See the [HDFS setup 
guide](/docs/maintenance/filesystems/hdfs.md) for detailed instructions.
+Additionally, when using Paimon with HDFS, you must also configure the Fluss 
server with the Hadoop environment. See the [HDFS setup 
guide](../../filesystems/hdfs) for detailed instructions.
 
 ### Start The Datalake Tiering Service
 Then, you must start the datalake tiering service to tier Fluss's data to the 
lakehouse storage.
diff --git a/website/src/pages/index.tsx b/website/src/pages/index.tsx
index 06aa2fcfc..5cf202b53 100644
--- a/website/src/pages/index.tsx
+++ b/website/src/pages/index.tsx
@@ -38,7 +38,7 @@ function HomepageHeader() {
                 <div className={styles.buttons}>
                     <Link
                         className={clsx("hero_button button button--primary 
button--lg", styles.buttonWidth)}
-                        to="/docs/quickstart/flink">
+                        to="/docs/next/quickstart/flink">
                         Quick Start
                     </Link>
 


Reply via email to