This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit a8b38a6dfff553d94f6632aec7cd0ac9a7de0f20 Author: Jark Wu <[email protected]> AuthorDate: Tue Feb 3 15:09:01 2026 +0800 [spark] Rename parser to FlussSqlExtension and add generated class into source code --- fluss-spark/fluss-spark-common/pom.xml | 2 ++ ...FlussSparkSqlParser.g4 => FlussSqlExtension.g4} | 2 +- .../fluss/spark/catalog/WithFlussAdmin.scala | 10 ++++---- .../fluss/spark/procedure/BaseProcedure.scala | 21 ++--------------- .../procedure/GetClusterConfigsProcedure.scala | 1 - .../sql/catalyst/parser/FlussSparkSqlParser.scala | 8 +++---- .../sql/catalyst/parser/FlussSqlAstBuilder.scala | 6 ++--- .../procedure/GetClusterConfigsProcedureTest.scala | 27 ++++++++-------------- 8 files changed, 24 insertions(+), 53 deletions(-) diff --git a/fluss-spark/fluss-spark-common/pom.xml b/fluss-spark/fluss-spark-common/pom.xml index 95b7c3bfb..267b0984b 100644 --- a/fluss-spark/fluss-spark-common/pom.xml +++ b/fluss-spark/fluss-spark-common/pom.xml @@ -64,6 +64,8 @@ </executions> <configuration> <visitor>true</visitor> + <listener>true</listener> + <sourceDirectory>src/main/antlr4</sourceDirectory> </configuration> </plugin> diff --git a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4 b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSqlExtension.g4 similarity index 99% rename from fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4 rename to fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSqlExtension.g4 index d377e677c..8ebb31cc1 100644 --- a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4 +++ b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSqlExtension.g4 @@ -16,7 +16,7 @@ * limitations under the License. */ -grammar FlussSparkSqlParser; +grammar FlussSqlExtension; @lexer::members { public boolean isValidDecimal() { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala index 3e7653cdf..0579bca10 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala @@ -54,11 +54,6 @@ trait WithFlussAdmin extends AutoCloseable { _flussConfig } - protected def admin: Admin = { - Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.") - _admin - } - /** * Gets the Fluss Admin client. This is a public accessor for procedures and other external * components that need admin access. @@ -66,7 +61,10 @@ trait WithFlussAdmin extends AutoCloseable { * @return * the Admin instance */ - def getAdmin: Admin = admin + def admin: Admin = { + Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.") + _admin + } override def close(): Unit = { IOUtils.closeQuietly(_admin, "fluss-admin") diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala index f7f07b475..ef70f89e9 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala @@ -97,32 +97,15 @@ abstract class BaseProcedure(tableCatalog: TableCatalog) extends Procedure { * @return * the Admin instance */ - protected def getAdmin(): Admin = { + protected def admin: Admin = { tableCatalog match { - case withAdmin: WithFlussAdmin => withAdmin.getAdmin + case withAdmin: WithFlussAdmin => withAdmin.admin case _ => throw new IllegalStateException( s"Catalog does not support Fluss admin: ${tableCatalog.getClass.getName}") } } - /** - * Gets the Fluss Admin client from a SparkTable. - * - * @param table - * the SparkTable instance - * @return - * the Admin instance - */ - protected def getAdmin(table: SparkTable): Admin = { - table match { - case abstractTable: AbstractSparkTable => abstractTable.admin - case _ => - throw new IllegalArgumentException( - s"Table is not an AbstractSparkTable: ${table.getClass.getName}") - } - } - /** * Creates a new InternalRow with the given values. * diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala index cdf3403da..15c9886ee 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala @@ -65,7 +65,6 @@ class GetClusterConfigsProcedure(tableCatalog: TableCatalog) extends BaseProcedu private def getConfigs(configKeys: Array[String]): Array[InternalRow] = { try { - val admin = getAdmin() val configs = admin.describeClusterConfigs().get().asScala if (configKeys.isEmpty) { diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala index 2080d4da1..8f5470855 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala @@ -23,7 +23,6 @@ import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.{DataType, StructType} @@ -73,15 +72,14 @@ class FlussSparkSqlParser(delegate: ParserInterface) extends ParserInterface { } private def parse[T](sqlText: String)( - toResult: org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser => T): T = { - val lexer = new FlussSparkSqlParserLexer( - new UpperCaseCharStream(CharStreams.fromString(sqlText))) + toResult: org.apache.spark.sql.catalyst.parser.FlussSqlExtensionParser => T): T = { + val lexer = new FlussSqlExtensionLexer(new UpperCaseCharStream(CharStreams.fromString(sqlText))) lexer.removeErrorListeners() lexer.addErrorListener(FlussParseErrorListener) val tokenStream = new CommonTokenStream(lexer) val parser = - new org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser(tokenStream) + new org.apache.spark.sql.catalyst.parser.FlussSqlExtensionParser(tokenStream) parser.removeErrorListeners() parser.addErrorListener(FlussParseErrorListener) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala index 053c23ea0..4cfa0e986 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala @@ -20,12 +20,10 @@ package org.apache.spark.sql.catalyst.parser import org.apache.fluss.spark.catalyst.plans.logical._ import org.antlr.v4.runtime._ -import org.antlr.v4.runtime.misc.Interval import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser._ -import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.FlussSqlExtensionParser._ import org.apache.spark.sql.catalyst.plans.logical._ import scala.collection.JavaConverters._ @@ -37,7 +35,7 @@ import scala.collection.JavaConverters._ * The main Spark SQL parser. */ class FlussSqlAstBuilder(delegate: ParserInterface) - extends FlussSparkSqlParserBaseVisitor[AnyRef] + extends FlussSqlExtensionBaseVisitor[AnyRef] with Logging { /** Creates a single statement of extension statements. */ diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala index a2b0c4396..1f5fa4f94 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala @@ -18,11 +18,8 @@ package org.apache.fluss.spark.procedure import org.apache.fluss.config.ConfigOptions -import org.apache.fluss.config.cluster.{AlterConfig, AlterConfigOpType} import org.apache.fluss.spark.FlussSparkTestBase -import scala.collection.JavaConverters._ - class GetClusterConfigsProcedureTest extends FlussSparkTestBase { test("get_cluster_configs: get all configurations") { @@ -41,6 +38,10 @@ class GetClusterConfigsProcedureTest extends FlussSparkTestBase { assert(row.getString(1) != null) assert(row.getString(2) != null) } + + val result2 = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array())").collect() + assertResult(result)(result2) } test("get_cluster_configs: get specific configuration") { @@ -50,15 +51,12 @@ class GetClusterConfigsProcedureTest extends FlussSparkTestBase { s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$testKey'))").collect() assert(result.length == 1) - val row = result.head - assert(row.getString(0) == testKey) - assert(row.getString(1) != null) - assert(row.getString(2) != null) + assert(result.head.toString() == "[kv.snapshot.interval,1 s,STATIC]") } test("get_cluster_configs: get multiple configurations") { val key1 = ConfigOptions.KV_SNAPSHOT_INTERVAL.key() - val key2 = ConfigOptions.REMOTE_DATA_DIR.key() + val key2 = ConfigOptions.BIND_LISTENERS.key() val result = sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$key1', '$key2'))") @@ -66,15 +64,10 @@ class GetClusterConfigsProcedureTest extends FlussSparkTestBase { assert(result.length == 2) - val keys = result.map(_.getString(0)).toSet - assert(keys.contains(key1)) - assert(keys.contains(key2)) - - result.foreach { - row => - assert(row.getString(1) != null) - assert(row.getString(2) != null) - } + // convert the result into a map of key to value for easy verification, key is the first column + val kvMap: Map[String, String] = result.map(r => r.getString(0) -> r.toString).toMap + assert(kvMap.getOrElse(key1, "") == s"[$key1,1 s,STATIC]") + assert(kvMap.getOrElse(key2, "") == s"[$key2,FLUSS://localhost:0,STATIC]") } test("get_cluster_configs: get non-existent configuration") {
