This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit a8b38a6dfff553d94f6632aec7cd0ac9a7de0f20
Author: Jark Wu <[email protected]>
AuthorDate: Tue Feb 3 15:09:01 2026 +0800

    [spark] Rename parser to FlussSqlExtension and add generated class into 
source code
---
 fluss-spark/fluss-spark-common/pom.xml             |  2 ++
 ...FlussSparkSqlParser.g4 => FlussSqlExtension.g4} |  2 +-
 .../fluss/spark/catalog/WithFlussAdmin.scala       | 10 ++++----
 .../fluss/spark/procedure/BaseProcedure.scala      | 21 ++---------------
 .../procedure/GetClusterConfigsProcedure.scala     |  1 -
 .../sql/catalyst/parser/FlussSparkSqlParser.scala  |  8 +++----
 .../sql/catalyst/parser/FlussSqlAstBuilder.scala   |  6 ++---
 .../procedure/GetClusterConfigsProcedureTest.scala | 27 ++++++++--------------
 8 files changed, 24 insertions(+), 53 deletions(-)

diff --git a/fluss-spark/fluss-spark-common/pom.xml 
b/fluss-spark/fluss-spark-common/pom.xml
index 95b7c3bfb..267b0984b 100644
--- a/fluss-spark/fluss-spark-common/pom.xml
+++ b/fluss-spark/fluss-spark-common/pom.xml
@@ -64,6 +64,8 @@
                 </executions>
                 <configuration>
                     <visitor>true</visitor>
+                    <listener>true</listener>
+                    <sourceDirectory>src/main/antlr4</sourceDirectory>
                 </configuration>
             </plugin>
 
diff --git 
a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4
 
b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSqlExtension.g4
similarity index 99%
rename from 
fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4
rename to 
fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSqlExtension.g4
index d377e677c..8ebb31cc1 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4
+++ 
b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSqlExtension.g4
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-grammar FlussSparkSqlParser;
+grammar FlussSqlExtension;
 
 @lexer::members {
   public boolean isValidDecimal() {
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
index 3e7653cdf..0579bca10 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
@@ -54,11 +54,6 @@ trait WithFlussAdmin extends AutoCloseable {
     _flussConfig
   }
 
-  protected def admin: Admin = {
-    Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.")
-    _admin
-  }
-
   /**
    * Gets the Fluss Admin client. This is a public accessor for procedures and 
other external
    * components that need admin access.
@@ -66,7 +61,10 @@ trait WithFlussAdmin extends AutoCloseable {
    * @return
    *   the Admin instance
    */
-  def getAdmin: Admin = admin
+  def admin: Admin = {
+    Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.")
+    _admin
+  }
 
   override def close(): Unit = {
     IOUtils.closeQuietly(_admin, "fluss-admin")
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
index f7f07b475..ef70f89e9 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala
@@ -97,32 +97,15 @@ abstract class BaseProcedure(tableCatalog: TableCatalog) 
extends Procedure {
    * @return
    *   the Admin instance
    */
-  protected def getAdmin(): Admin = {
+  protected def admin: Admin = {
     tableCatalog match {
-      case withAdmin: WithFlussAdmin => withAdmin.getAdmin
+      case withAdmin: WithFlussAdmin => withAdmin.admin
       case _ =>
         throw new IllegalStateException(
           s"Catalog does not support Fluss admin: 
${tableCatalog.getClass.getName}")
     }
   }
 
-  /**
-   * Gets the Fluss Admin client from a SparkTable.
-   *
-   * @param table
-   *   the SparkTable instance
-   * @return
-   *   the Admin instance
-   */
-  protected def getAdmin(table: SparkTable): Admin = {
-    table match {
-      case abstractTable: AbstractSparkTable => abstractTable.admin
-      case _ =>
-        throw new IllegalArgumentException(
-          s"Table is not an AbstractSparkTable: ${table.getClass.getName}")
-    }
-  }
-
   /**
    * Creates a new InternalRow with the given values.
    *
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
index cdf3403da..15c9886ee 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala
@@ -65,7 +65,6 @@ class GetClusterConfigsProcedure(tableCatalog: TableCatalog) 
extends BaseProcedu
 
   private def getConfigs(configKeys: Array[String]): Array[InternalRow] = {
     try {
-      val admin = getAdmin()
       val configs = admin.describeClusterConfigs().get().asScala
 
       if (configKeys.isEmpty) {
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
index 2080d4da1..8f5470855 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala
@@ -23,7 +23,6 @@ import org.antlr.v4.runtime.misc.{Interval, 
ParseCancellationException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -73,15 +72,14 @@ class FlussSparkSqlParser(delegate: ParserInterface) 
extends ParserInterface {
   }
 
   private def parse[T](sqlText: String)(
-      toResult: org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser 
=> T): T = {
-    val lexer = new FlussSparkSqlParserLexer(
-      new UpperCaseCharStream(CharStreams.fromString(sqlText)))
+      toResult: org.apache.spark.sql.catalyst.parser.FlussSqlExtensionParser 
=> T): T = {
+    val lexer = new FlussSqlExtensionLexer(new 
UpperCaseCharStream(CharStreams.fromString(sqlText)))
     lexer.removeErrorListeners()
     lexer.addErrorListener(FlussParseErrorListener)
 
     val tokenStream = new CommonTokenStream(lexer)
     val parser =
-      new 
org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser(tokenStream)
+      new 
org.apache.spark.sql.catalyst.parser.FlussSqlExtensionParser(tokenStream)
     parser.removeErrorListeners()
     parser.addErrorListener(FlussParseErrorListener)
 
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
index 053c23ea0..4cfa0e986 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala
@@ -20,12 +20,10 @@ package org.apache.spark.sql.catalyst.parser
 import org.apache.fluss.spark.catalyst.plans.logical._
 
 import org.antlr.v4.runtime._
-import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser._
-import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.parser.FlussSqlExtensionParser._
 import org.apache.spark.sql.catalyst.plans.logical._
 
 import scala.collection.JavaConverters._
@@ -37,7 +35,7 @@ import scala.collection.JavaConverters._
  *   The main Spark SQL parser.
  */
 class FlussSqlAstBuilder(delegate: ParserInterface)
-  extends FlussSparkSqlParserBaseVisitor[AnyRef]
+  extends FlussSqlExtensionBaseVisitor[AnyRef]
   with Logging {
 
   /** Creates a single statement of extension statements. */
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
index a2b0c4396..1f5fa4f94 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala
@@ -18,11 +18,8 @@
 package org.apache.fluss.spark.procedure
 
 import org.apache.fluss.config.ConfigOptions
-import org.apache.fluss.config.cluster.{AlterConfig, AlterConfigOpType}
 import org.apache.fluss.spark.FlussSparkTestBase
 
-import scala.collection.JavaConverters._
-
 class GetClusterConfigsProcedureTest extends FlussSparkTestBase {
 
   test("get_cluster_configs: get all configurations") {
@@ -41,6 +38,10 @@ class GetClusterConfigsProcedureTest extends 
FlussSparkTestBase {
         assert(row.getString(1) != null)
         assert(row.getString(2) != null)
     }
+
+    val result2 =
+      sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array())").collect()
+    assertResult(result)(result2)
   }
 
   test("get_cluster_configs: get specific configuration") {
@@ -50,15 +51,12 @@ class GetClusterConfigsProcedureTest extends 
FlussSparkTestBase {
       s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$testKey'))").collect()
 
     assert(result.length == 1)
-    val row = result.head
-    assert(row.getString(0) == testKey)
-    assert(row.getString(1) != null)
-    assert(row.getString(2) != null)
+    assert(result.head.toString() == "[kv.snapshot.interval,1 s,STATIC]")
   }
 
   test("get_cluster_configs: get multiple configurations") {
     val key1 = ConfigOptions.KV_SNAPSHOT_INTERVAL.key()
-    val key2 = ConfigOptions.REMOTE_DATA_DIR.key()
+    val key2 = ConfigOptions.BIND_LISTENERS.key()
 
     val result =
       sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => 
array('$key1', '$key2'))")
@@ -66,15 +64,10 @@ class GetClusterConfigsProcedureTest extends 
FlussSparkTestBase {
 
     assert(result.length == 2)
 
-    val keys = result.map(_.getString(0)).toSet
-    assert(keys.contains(key1))
-    assert(keys.contains(key2))
-
-    result.foreach {
-      row =>
-        assert(row.getString(1) != null)
-        assert(row.getString(2) != null)
-    }
+    // convert the result into a map of key to value for easy verification, 
key is the first column
+    val kvMap: Map[String, String] = result.map(r => r.getString(0) -> 
r.toString).toMap
+    assert(kvMap.getOrElse(key1, "") == s"[$key1,1 s,STATIC]")
+    assert(kvMap.getOrElse(key2, "") == s"[$key2,FLUSS://localhost:0,STATIC]")
   }
 
   test("get_cluster_configs: get non-existent configuration") {

Reply via email to