This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit af547f31456de9d242298be7f6c26b7fc9ef41bb Author: forwardxu <[email protected]> AuthorDate: Sun Jan 18 13:31:26 2026 +0800 [spark] Support Spark CALL procedure command framework (#2408) --- fluss-spark/fluss-spark-common/pom.xml | 22 ++ .../FlussSparkSqlParser.g4 | 230 +++++++++++++++++++++ .../fluss/spark/FlussSparkSessionExtensions.scala | 39 ++++ .../org/apache/fluss/spark/SparkCatalog.scala | 25 ++- .../org/apache/fluss/spark/SparkProcedures.scala | 40 ++++ .../fluss/spark/catalog/SupportsProcedures.scala | 29 +++ .../fluss/spark/catalog/WithFlussAdmin.scala | 9 + .../catalyst/analysis/FlussProcedureResolver.scala | 189 +++++++++++++++++ .../catalyst/plans/logical/FlussCallCommand.scala | 52 +++++ .../spark/exception/NoSuchProcedureException.scala | 33 +++ .../fluss/spark/execution/CallProcedureExec.scala | 95 +++++++++ .../fluss/spark/execution/FlussStrategy.scala | 36 ++++ .../fluss/spark/procedure/BaseProcedure.scala | 176 ++++++++++++++++ .../procedure/GetClusterConfigsProcedure.scala | 140 +++++++++++++ .../apache/fluss/spark/procedure/Procedure.scala | 69 +++++++ .../fluss/spark/procedure/ProcedureBuilder.scala | 47 +++++ .../fluss/spark/procedure/ProcedureParameter.scala | 61 ++++++ .../sql/catalyst/parser/FlussSparkSqlParser.scala | 142 +++++++++++++ .../sql/catalyst/parser/FlussSqlAstBuilder.scala | 145 +++++++++++++ .../apache/fluss/spark/FlussSparkTestBase.scala | 1 + .../spark/extensions/CallStatementParserTest.scala | 154 ++++++++++++++ .../procedure/GetClusterConfigsProcedureTest.scala | 121 +++++++++++ website/blog/2024-11-29-fluss-open-source.md | 2 +- website/blog/2025-06-01-partial-updates.md | 4 +- website/blog/releases/0.6.md | 2 +- website/blog/releases/0.7.md | 8 +- website/blog/releases/0.8.md | 16 +- website/docs/engine-spark/_category_.json | 4 + website/docs/engine-spark/procedures.md | 107 ++++++++++ website/docs/maintenance/operations/rebalance.md | 2 +- .../tiered-storage/lakehouse-storage.md | 2 +- website/src/pages/index.tsx | 2 +- 32 files changed, 1983 insertions(+), 21 deletions(-) diff --git a/fluss-spark/fluss-spark-common/pom.xml b/fluss-spark/fluss-spark-common/pom.xml index e285b8bbc..95b7c3bfb 100644 --- a/fluss-spark/fluss-spark-common/pom.xml +++ b/fluss-spark/fluss-spark-common/pom.xml @@ -41,10 +41,32 @@ <artifactId>spark-catalyst_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> + + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>4.9.3</version> + </dependency> </dependencies> <build> <plugins> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr4-maven-plugin</artifactId> + <version>4.9.3</version> + <executions> + <execution> + <goals> + <goal>antlr4</goal> + </goals> + </execution> + </executions> + <configuration> + <visitor>true</visitor> + </configuration> + </plugin> + <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> diff --git a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4 b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4 new file mode 100644 index 000000000..d377e677c --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser/FlussSparkSqlParser.g4 @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +grammar FlussSparkSqlParser; + +@lexer::members { + public boolean isValidDecimal() { + int nextChar = _input.LA(1); + if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; + } else { + return true; + } + } + + public boolean isHint() { + int nextChar = _input.LA(1); + if (nextChar == '+') { + return true; + } else { + return false; + } + } +} + +singleStatement + : statement ';'* EOF + ; + +statement + : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call + ; + +callArgument + : expression #positionalArgument + | identifier '=>' expression #namedArgument + ; + +expression + : booleanExpression + ; + +booleanExpression + : predicated + | NOT booleanExpression + | EXISTS '(' query ')' + | booleanExpression AND booleanExpression + | booleanExpression OR booleanExpression + ; + +predicated + : valueExpression + ; + +valueExpression + : primaryExpression + ; + +primaryExpression + : constant #constantDefault + | functionName '(' (expression (',' expression)*)? ')' #functionCall + | '(' expression ')' #parenthesizedExpression + ; + +functionName + : multipartIdentifier + | identifier + ; + +query + : .+? + ; + +constant + : number #numericLiteral + | booleanValue #booleanLiteral + | STRING+ #stringLiteral + | identifier STRING #typeConstructor + ; + +booleanValue + : TRUE | FALSE + ; + +number + : MINUS? EXPONENT_VALUE #exponentLiteral + | MINUS? DECIMAL_VALUE #decimalLiteral + | MINUS? INTEGER_VALUE #integerLiteral + | MINUS? BIGINT_LITERAL #bigIntLiteral + | MINUS? SMALLINT_LITERAL #smallIntLiteral + | MINUS? TINYINT_LITERAL #tinyIntLiteral + | MINUS? DOUBLE_LITERAL #doubleLiteral + | MINUS? FLOAT_LITERAL #floatLiteral + | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral + ; + +multipartIdentifier + : parts+=identifier ('.' parts+=identifier)* + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | quotedIdentifier #quotedIdentifierAlternative + | nonReserved #unquotedIdentifier + ; + +quotedIdentifier + : BACKQUOTED_IDENTIFIER + ; + +nonReserved + : CALL | TRUE | FALSE | NOT | AND | OR | EXISTS + ; + +// Keywords +CALL: 'CALL'; +TRUE: 'TRUE'; +FALSE: 'FALSE'; +NOT: 'NOT'; +AND: 'AND'; +OR: 'OR'; +EXISTS: 'EXISTS'; + +// Operators +MINUS: '-'; + +// Literals +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : INTEGER_VALUE 'L' + ; + +SMALLINT_LITERAL + : INTEGER_VALUE 'S' + ; + +TINYINT_LITERAL + : INTEGER_VALUE 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +EXPONENT_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT {isValidDecimal()}? + ; + +DECIMAL_VALUE + : DECIMAL_DIGITS {isValidDecimal()}? + ; + +FLOAT_LITERAL + : DIGIT+ EXPONENT? 'F' + | DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +// Whitespace and comments +SIMPLE_COMMENT + : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN) + ; + +BRACKETED_COMMENT + : '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN) + ; + +WS + : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for any characters we didn't match +UNRECOGNIZED + : . + ; diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussSparkSessionExtensions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussSparkSessionExtensions.scala new file mode 100644 index 000000000..dc12ed503 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/FlussSparkSessionExtensions.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.spark.catalyst.analysis.FlussProcedureResolver +import org.apache.fluss.spark.execution.FlussStrategy + +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.parser.FlussSparkSqlParser + +/** Spark session extensions for Fluss. */ +class FlussSparkSessionExtensions extends (SparkSessionExtensions => Unit) { + + override def apply(extensions: SparkSessionExtensions): Unit = { + // parser extensions + extensions.injectParser { case (_, parser) => new FlussSparkSqlParser(parser) } + + // analyzer extensions + extensions.injectResolutionRule(spark => FlussProcedureResolver(spark)) + + // planner extensions + extensions.injectPlannerStrategy(spark => FlussStrategy(spark)) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 045063e4f..53426a4d1 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -19,7 +19,9 @@ package org.apache.fluss.spark import org.apache.fluss.exception.{DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} import org.apache.fluss.metadata.TablePath -import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin} +import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, SupportsProcedures, WithFlussAdmin} +import org.apache.fluss.spark.exception.NoSuchProcedureException +import org.apache.fluss.spark.procedure.{Procedure, ProcedureBuilder} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange} @@ -32,9 +34,14 @@ import java.util.concurrent.ExecutionException import scala.collection.JavaConverters._ -class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFlussAdmin { +class SparkCatalog + extends TableCatalog + with SupportsFlussNamespaces + with WithFlussAdmin + with SupportsProcedures { private var catalogName: String = "fluss" + private val SYSTEM_NAMESPACE = "sys" override def listTables(namespace: Array[String]): Array[Identifier] = { doNamespaceOperator(namespace) { @@ -116,6 +123,20 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl override def name(): String = catalogName + override def loadProcedure(identifier: Identifier): Procedure = { + if (isSystemNamespace(identifier.namespace)) { + val builder: ProcedureBuilder = SparkProcedures.newBuilder(identifier.name) + if (builder != null) { + return builder.withTableCatalog(this).build() + } + } + throw new NoSuchProcedureException(s"Procedure not found: $identifier") + } + + private def isSystemNamespace(namespace: Array[String]): Boolean = { + namespace.length == 1 && namespace(0).equalsIgnoreCase(SYSTEM_NAMESPACE) + } + private def toTablePath(ident: Identifier): TablePath = { assert(ident.namespace().length == 1, "Only single namespace is supported") TablePath.of(ident.namespace().head, ident.name) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala new file mode 100644 index 000000000..12a4b0582 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.spark.procedure.{GetClusterConfigsProcedure, ProcedureBuilder} + +import java.util.Locale + +object SparkProcedures { + + private val BUILDERS: Map[String, () => ProcedureBuilder] = initProcedureBuilders() + + def newBuilder(name: String): ProcedureBuilder = { + val builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT)) + builderSupplier.map(_()).orNull + } + + def names(): Set[String] = BUILDERS.keySet + + private def initProcedureBuilders(): Map[String, () => ProcedureBuilder] = { + Map( + "get_cluster_configs" -> (() => GetClusterConfigsProcedure.builder()) + ) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsProcedures.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsProcedures.scala new file mode 100644 index 000000000..cbc6adb1b --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/SupportsProcedures.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.fluss.spark.exception.NoSuchProcedureException +import org.apache.fluss.spark.procedure.Procedure + +import org.apache.spark.sql.connector.catalog.Identifier + +trait SupportsProcedures { + + @throws[NoSuchProcedureException] + def loadProcedure(identifier: Identifier): Procedure +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala index 2241b6b96..3e7653cdf 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala @@ -59,6 +59,15 @@ trait WithFlussAdmin extends AutoCloseable { _admin } + /** + * Gets the Fluss Admin client. This is a public accessor for procedures and other external + * components that need admin access. + * + * @return + * the Admin instance + */ + def getAdmin: Admin = admin + override def close(): Unit = { IOUtils.closeQuietly(_admin, "fluss-admin") IOUtils.closeQuietly(_connection, "fluss-connection"); diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala new file mode 100644 index 000000000..a4aa96e3d --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalyst.analysis + +import org.apache.fluss.spark.catalog.SupportsProcedures +import org.apache.fluss.spark.catalyst.plans.logical._ +import org.apache.fluss.spark.procedure.ProcedureParameter + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier} + +import java.util.Locale + +/** Resolution rule for Fluss stored procedures. */ +case class FlussProcedureResolver(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case FlussCallStatement(nameParts, arguments) if nameParts.nonEmpty => + val (catalog, identifier) = resolveCatalogAndIdentifier(nameParts) + if (catalog == null || !catalog.isInstanceOf[SupportsProcedures]) { + throw new RuntimeException(s"Catalog ${nameParts.head} is not a SupportsProcedures") + } + + val procedureCatalog = catalog.asInstanceOf[SupportsProcedures] + val procedure = procedureCatalog.loadProcedure(identifier) + val parameters = procedure.parameters + val normalizedParameters = normalizeParameters(parameters) + validateParameters(normalizedParameters) + val normalizedArguments = normalizeArguments(arguments) + FlussCallCommand( + procedure, + args = buildArgumentExpressions(normalizedParameters, normalizedArguments)) + + case call @ FlussCallCommand(procedure, arguments) if call.resolved => + val parameters = procedure.parameters + val newArguments = arguments.zipWithIndex.map { + case (argument, index) => + val parameter = parameters(index) + val parameterType = parameter.dataType + val argumentType = argument.dataType + if (parameterType != argumentType && !Cast.canUpCast(argumentType, parameterType)) { + throw new RuntimeException( + s"Cannot cast $argumentType to $parameterType of ${parameter.name}.") + } + if (parameterType != argumentType) { + Cast(argument, parameterType) + } else { + argument + } + } + + if (newArguments != arguments) { + call.copy(args = newArguments) + } else { + call + } + } + + private def resolveCatalogAndIdentifier(nameParts: Seq[String]): (CatalogPlugin, Identifier) = { + val catalogManager = sparkSession.sessionState.catalogManager + if (nameParts.length == 2) { + val catalogName = nameParts.head + val procedureName = nameParts(1) + val catalog = catalogManager.catalog(catalogName) + (catalog, Identifier.of(Array("sys"), procedureName)) + } else if (nameParts.length == 3) { + val catalogName = nameParts.head + val namespace = nameParts(1) + val procedureName = nameParts(2) + val catalog = catalogManager.catalog(catalogName) + (catalog, Identifier.of(Array(namespace), procedureName)) + } else { + throw new RuntimeException(s"Invalid procedure name: ${nameParts.mkString(".")}") + } + } + + private def normalizeParameters(parameters: Seq[ProcedureParameter]): Seq[ProcedureParameter] = { + parameters.map { + parameter => + val normalizedName = parameter.name.toLowerCase(Locale.ROOT) + if (parameter.isRequired) { + ProcedureParameter.required(normalizedName, parameter.dataType) + } else { + ProcedureParameter.optional(normalizedName, parameter.dataType) + } + } + } + + private def validateParameters(parameters: Seq[ProcedureParameter]): Unit = { + val duplicateParamNames = parameters.groupBy(_.name).collect { + case (name, matchingParams) if matchingParams.length > 1 => name + } + if (duplicateParamNames.nonEmpty) { + throw new RuntimeException( + s"Parameter names ${duplicateParamNames.mkString("[", ",", "]")} are duplicated.") + } + parameters.sliding(2).foreach { + case Seq(previousParam, currentParam) + if !previousParam.isRequired && currentParam.isRequired => + throw new RuntimeException( + s"Optional parameters should be after required ones but $currentParam is after $previousParam.") + case _ => + } + } + + private def normalizeArguments(arguments: Seq[FlussCallArgument]): Seq[FlussCallArgument] = { + arguments.map { + case a @ FlussNamedArgument(name, _) => a.copy(name = name.toLowerCase(Locale.ROOT)) + case other => other + } + } + + private def buildArgumentExpressions( + parameters: Seq[ProcedureParameter], + arguments: Seq[FlussCallArgument]): Seq[Expression] = { + val nameToPositionMap = parameters.map(_.name).zipWithIndex.toMap + val nameToArgumentMap = buildNameToArgumentMap(parameters, arguments, nameToPositionMap) + val missingParamNames = parameters.filter(_.isRequired).collect { + case parameter if !nameToArgumentMap.contains(parameter.name) => parameter.name + } + if (missingParamNames.nonEmpty) { + throw new RuntimeException( + s"Required parameters ${missingParamNames.mkString("[", ",", "]")} are missing.") + } + val argumentExpressions = new Array[Expression](parameters.size) + nameToArgumentMap.foreach { + case (name, argument) => argumentExpressions(nameToPositionMap(name)) = argument.expr + } + parameters.foreach { + case parameter if !parameter.isRequired && !nameToArgumentMap.contains(parameter.name) => + argumentExpressions(nameToPositionMap(parameter.name)) = + Literal.create(null, parameter.dataType) + case _ => + } + argumentExpressions.toSeq + } + + private def buildNameToArgumentMap( + parameters: Seq[ProcedureParameter], + arguments: Seq[FlussCallArgument], + nameToPositionMap: Map[String, Int]): Map[String, FlussCallArgument] = { + val isNamedArgument = arguments.exists(_.isInstanceOf[FlussNamedArgument]) + val isPositionalArgument = arguments.exists(_.isInstanceOf[FlussPositionalArgument]) + + if (isNamedArgument && isPositionalArgument) { + throw new RuntimeException("Cannot mix named and positional arguments.") + } + + if (isNamedArgument) { + val namedArguments = arguments.asInstanceOf[Seq[FlussNamedArgument]] + val validationErrors = namedArguments.groupBy(_.name).collect { + case (name, procedureArguments) if procedureArguments.size > 1 => + s"Procedure argument $name is duplicated." + case (name, _) if !nameToPositionMap.contains(name) => s"Argument $name is unknown." + } + if (validationErrors.nonEmpty) { + throw new RuntimeException(s"Invalid arguments: ${validationErrors.mkString(", ")}") + } + namedArguments.map(arg => arg.name -> arg).toMap + } else { + if (arguments.size > parameters.size) { + throw new RuntimeException("Too many arguments for procedure") + } + arguments.zipWithIndex.map { + case (argument, position) => + val param = parameters(position) + param.name -> argument + }.toMap + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala new file mode 100644 index 000000000..f0ea0265d --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalyst.plans.logical + +import org.apache.fluss.spark.procedure.Procedure + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.catalyst.util.truncatedString + +/** A CALL statement that needs to be resolved to a procedure. */ +case class FlussCallStatement(name: Seq[String], args: Seq[FlussCallArgument]) extends LeafCommand { + override def output: Seq[Attribute] = Seq.empty +} + +/** Base trait for CALL statement arguments. */ +sealed trait FlussCallArgument { + def expr: Expression +} + +/** A positional argument in a stored procedure call. */ +case class FlussPositionalArgument(expr: Expression) extends FlussCallArgument + +/** A named argument in a stored procedure call. */ +case class FlussNamedArgument(name: String, expr: Expression) extends FlussCallArgument + +/** A CALL command that has been resolved to a specific procedure. */ +case class FlussCallCommand(procedure: Procedure, args: Seq[Expression]) extends LeafCommand { + + override lazy val output: Seq[Attribute] = + procedure.outputType.map( + field => AttributeReference(field.name, field.dataType, field.nullable, field.metadata)()) + + override def simpleString(maxFields: Int): String = { + s"Call${truncatedString(output, "[", ", ", "]", maxFields)} ${procedure.description}" + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/exception/NoSuchProcedureException.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/exception/NoSuchProcedureException.scala new file mode 100644 index 000000000..c27adf422 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/exception/NoSuchProcedureException.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.exception + +/** + * Exception for trying to execute a procedure that doesn't exist. + * + * @param message + * the error message + * @param cause + * the underlying cause (optional) + * @since 0.9 + */ +class NoSuchProcedureException(message: String, cause: Throwable) + extends RuntimeException(message, cause) { + + def this(message: String) = this(message, null) +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala new file mode 100644 index 000000000..7af4ca3ec --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.execution + +import org.apache.fluss.spark.procedure.Procedure + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow} +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for executing a stored procedure. + * + * This implementation extends LeafExecNode for simpler command-style execution without requiring + * RDD-based distributed processing. This approach is more appropriate for procedures which are + * typically synchronous administrative operations. + * + * Benefits over SparkPlan-based implementation: + * - No need to create and manage RDDs explicitly + * - Simpler code without parallelize() overhead + * - Better semantic match for command-style operations + * - Consistent with other catalog implementations (e.g., Apache Paimon) + * + * @param output + * the output attributes of the procedure result + * @param procedure + * the procedure to execute + * @param args + * the argument expressions to pass to the procedure + */ +case class CallProcedureExec(output: Seq[Attribute], procedure: Procedure, args: Seq[Expression]) + extends LeafExecNode { + + override def simpleString(maxFields: Int): String = { + s"CallProcedure ${procedure.description()}" + } + + /** + * Execute the procedure and collect results efficiently. + * + * This is the primary execution method that directly returns results without unnecessary RDD + * creation overhead. + */ + override def executeCollect(): Array[InternalRow] = { + // Evaluate all argument expressions + val argumentValues = new Array[Any](args.length) + args.zipWithIndex.foreach { + case (arg, index) => + argumentValues(index) = arg.eval(null) + } + + // Package arguments and invoke procedure + val argRow = new GenericInternalRow(argumentValues) + procedure.call(argRow) + } + + override def executeTake(limit: Int): Array[InternalRow] = { + executeCollect().take(limit) + } + + override def executeTail(limit: Int): Array[InternalRow] = { + val results = executeCollect() + results.takeRight(limit) + } + + /** + * Execute the procedure and return results as an RDD. + * + * This method is required by the SparkPlan interface but simply wraps the executeCollect() + * results in an RDD for compatibility with the execution framework. + */ + override protected def doExecute(): org.apache.spark.rdd.RDD[InternalRow] = { + sparkContext.parallelize(executeCollect(), 1) + } + + override def withNewChildrenInternal( + newChildren: IndexedSeq[org.apache.spark.sql.execution.SparkPlan]): CallProcedureExec = { + copy() + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala new file mode 100644 index 000000000..e4d3d4117 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.execution + +import org.apache.fluss.spark.catalyst.plans.logical.FlussCallCommand + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} + +/** Execution strategy for Fluss procedure calls. */ +case class FlussStrategy(spark: SparkSession) extends SparkStrategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case call: FlussCallCommand => + CallProcedureExec(call.output, call.procedure, call.args) :: Nil + case _ => Nil + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala new file mode 100644 index 000000000..f7f07b475 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.SparkTable +import org.apache.fluss.spark.catalog.{AbstractSparkTable, WithFlussAdmin} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} + +/** + * Base class for Fluss stored procedures. + * + * This class provides common utility methods for procedure implementations, including identifier + * parsing, table loading, admin access, and result row construction. + * + * @param tableCatalog + * the Spark catalog that owns this procedure + */ +abstract class BaseProcedure(tableCatalog: TableCatalog) extends Procedure { + + /** + * Converts a string identifier to a Spark Identifier object. + * + * @param identifierAsString + * the identifier string (e.g., "db.table" or "table") + * @param argName + * the parameter name for error reporting + * @return + * the Spark Identifier + */ + protected def toIdentifier(identifierAsString: String, argName: String): Identifier = { + if (identifierAsString == null || identifierAsString.isEmpty) { + throw new IllegalArgumentException(s"Cannot handle an empty identifier for argument $argName") + } + + val spark = SparkSession.active + val multipartIdentifier = identifierAsString.split("\\.") + + if (multipartIdentifier.length == 1) { + val defaultNamespace = spark.sessionState.catalogManager.currentNamespace + Identifier.of(defaultNamespace, multipartIdentifier(0)) + } else if (multipartIdentifier.length == 2) { + Identifier.of(Array(multipartIdentifier(0)), multipartIdentifier(1)) + } else { + throw new IllegalArgumentException( + s"Invalid identifier format for argument $argName: $identifierAsString") + } + } + + /** + * Loads a Spark table from the catalog. + * + * @param ident + * the table identifier + * @return + * the SparkTable instance + */ + protected def loadSparkTable(ident: Identifier): SparkTable = { + try { + val table = tableCatalog.loadTable(ident) + table match { + case sparkTable: SparkTable => sparkTable + case _ => + throw new IllegalArgumentException( + s"$ident is not a Fluss table: ${table.getClass.getName}") + } + } catch { + case e: Exception => + val errMsg = s"Couldn't load table '$ident' in catalog '${tableCatalog.name()}'" + throw new RuntimeException(errMsg, e) + } + } + + /** + * Gets the Fluss Admin client from the catalog. + * + * @return + * the Admin instance + */ + protected def getAdmin(): Admin = { + tableCatalog match { + case withAdmin: WithFlussAdmin => withAdmin.getAdmin + case _ => + throw new IllegalStateException( + s"Catalog does not support Fluss admin: ${tableCatalog.getClass.getName}") + } + } + + /** + * Gets the Fluss Admin client from a SparkTable. + * + * @param table + * the SparkTable instance + * @return + * the Admin instance + */ + protected def getAdmin(table: SparkTable): Admin = { + table match { + case abstractTable: AbstractSparkTable => abstractTable.admin + case _ => + throw new IllegalArgumentException( + s"Table is not an AbstractSparkTable: ${table.getClass.getName}") + } + } + + /** + * Creates a new InternalRow with the given values. + * + * @param values + * the values for the row + * @return + * the InternalRow instance + */ + protected def newInternalRow(values: Any*): InternalRow = { + new GenericInternalRow(values.toArray) + } + + /** + * Converts a Spark Identifier to a Fluss TablePath. + * + * @param ident + * the Spark identifier + * @return + * the TablePath instance + */ + protected def toTablePath(ident: Identifier): TablePath = { + if (ident.namespace().length != 1) { + throw new IllegalArgumentException("Only single namespace is supported") + } + TablePath.of(ident.namespace()(0), ident.name()) + } +} + +object BaseProcedure { + + /** + * Abstract builder class for BaseProcedure implementations. + * + * @tparam T + * the concrete procedure type to build + */ + abstract class Builder[T <: BaseProcedure] extends ProcedureBuilder { + private var tableCatalog: TableCatalog = _ + + override def withTableCatalog(newTableCatalog: TableCatalog): Builder[T] = { + this.tableCatalog = newTableCatalog + this + } + + override def build(): T = doBuild() + + protected def doBuild(): T + + protected def getTableCatalog: TableCatalog = tableCatalog + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala new file mode 100644 index 000000000..cdf3403da --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedure.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +/** + * Procedure to get cluster configuration(s). + * + * This procedure allows querying dynamic cluster configurations. It can retrieve: + * - Specific configurations by key(s) + * - All configurations (when no keys are provided) + * + * Usage examples: + * {{{ + * -- Get a specific configuration + * CALL sys.get_cluster_configs('kv.rocksdb.shared-rate-limiter.bytes-per-sec') + * + * -- Get all cluster configurations + * CALL sys.get_cluster_configs() + * }}} + */ +class GetClusterConfigsProcedure(tableCatalog: TableCatalog) extends BaseProcedure(tableCatalog) { + + override def parameters(): Array[ProcedureParameter] = { + GetClusterConfigsProcedure.PARAMETERS + } + + override def outputType(): StructType = { + GetClusterConfigsProcedure.OUTPUT_TYPE + } + + override def call(args: InternalRow): Array[InternalRow] = { + val configKeys = if (args.numFields > 0 && !args.isNullAt(0)) { + val keysArray = args.getArray(0) + (0 until keysArray.numElements()) + .map(i => keysArray.getUTF8String(i).toString) + .toArray + } else { + Array.empty[String] + } + + getConfigs(configKeys) + } + + private def getConfigs(configKeys: Array[String]): Array[InternalRow] = { + try { + val admin = getAdmin() + val configs = admin.describeClusterConfigs().get().asScala + + if (configKeys.isEmpty) { + configs + .map( + entry => + newInternalRow( + UTF8String.fromString(entry.key()), + UTF8String.fromString(entry.value()), + UTF8String.fromString(formatConfigSource(entry.source())) + )) + .toArray + } else { + val configEntryMap = configs.map(e => e.key() -> e).toMap + configKeys.flatMap { + key => + configEntryMap.get(key).map { + entry => + newInternalRow( + UTF8String.fromString(entry.key()), + UTF8String.fromString(entry.value()), + UTF8String.fromString(formatConfigSource(entry.source())) + ) + } + } + } + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to get cluster config: ${e.getMessage}", e) + } + } + + private def formatConfigSource( + source: org.apache.fluss.config.cluster.ConfigEntry.ConfigSource): String = { + if (source == null) { + "UNKNOWN" + } else { + source.name() match { + case "DYNAMIC_SERVER_CONFIG" => "DYNAMIC" + case "INITIAL_SERVER_CONFIG" => "STATIC" + case _ => source.name() + } + } + } + + override def description(): String = { + "Retrieve cluster configuration values." + } +} + +object GetClusterConfigsProcedure { + + private val PARAMETERS: Array[ProcedureParameter] = Array( + ProcedureParameter.optional("config_keys", DataTypes.createArrayType(DataTypes.StringType)) + ) + + private val OUTPUT_TYPE: StructType = new StructType( + Array( + new StructField("config_key", DataTypes.StringType, nullable = false, Metadata.empty), + new StructField("config_value", DataTypes.StringType, nullable = false, Metadata.empty), + new StructField("config_source", DataTypes.StringType, nullable = false, Metadata.empty) + ) + ) + + def builder(): ProcedureBuilder = { + new BaseProcedure.Builder[GetClusterConfigsProcedure]() { + override protected def doBuild(): GetClusterConfigsProcedure = { + new GetClusterConfigsProcedure(getTableCatalog) + } + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala new file mode 100644 index 000000000..81faef57f --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType + +/** + * Interface for Fluss stored procedures that can be invoked via Spark SQL CALL statements. + * + * Procedures allow users to perform administrative and management operations through Spark SQL. All + * procedures are located in the `sys` namespace. + * + * Example usage: + * {{{ + * CALL catalog.sys.procedure_name(parameter => 'value') + * }}} + */ +trait Procedure { + + /** + * Returns the parameters accepted by this procedure. + * + * @return + * array of procedure parameters + */ + def parameters(): Array[ProcedureParameter] + + /** + * Returns the output schema of this procedure. + * + * @return + * the output StructType + */ + def outputType(): StructType + + /** + * Executes the procedure with the given arguments. + * + * @param args + * the argument values as an InternalRow + * @return + * array of result rows + */ + def call(args: InternalRow): Array[InternalRow] + + /** + * Returns a human-readable description of this procedure. + * + * @return + * the procedure description + */ + def description(): String = getClass.toString +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala new file mode 100644 index 000000000..b017edf7c --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.connector.catalog.TableCatalog + +/** + * Builder interface for creating Procedure instances. + * + * Procedure builders are used by the catalog to instantiate procedures with the necessary + * dependencies. + */ +trait ProcedureBuilder { + + /** + * Sets the table catalog that the procedure will use. + * + * @param tableCatalog + * the catalog instance + * @return + * this builder for method chaining + */ + def withTableCatalog(tableCatalog: TableCatalog): ProcedureBuilder + + /** + * Builds and returns the procedure instance. + * + * @return + * the constructed Procedure + */ + def build(): Procedure +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala new file mode 100644 index 000000000..0c48e7631 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.types.DataType + +/** + * Represents a parameter for a Fluss stored procedure. + * + * @param name + * the parameter name + * @param dataType + * the Spark SQL data type of the parameter + * @param isRequired + * whether this parameter is required + */ +case class ProcedureParameter(name: String, dataType: DataType, isRequired: Boolean = false) + +object ProcedureParameter { + + /** + * Creates a required procedure parameter. + * + * @param name + * the parameter name + * @param dataType + * the parameter data type + * @return + * a required ProcedureParameter + */ + def required(name: String, dataType: DataType): ProcedureParameter = + ProcedureParameter(name, dataType, isRequired = true) + + /** + * Creates an optional procedure parameter. + * + * @param name + * the parameter name + * @param dataType + * the parameter data type + * @return + * an optional ProcedureParameter + */ + def optional(name: String, dataType: DataType): ProcedureParameter = + ProcedureParameter(name, dataType, isRequired = false) +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala new file mode 100644 index 000000000..2080d4da1 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSparkSqlParser.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.{DataType, StructType} + +/** + * Parser extension for Fluss SQL extensions. + * + * @param delegate + * The main Spark SQL parser. + */ +class FlussSparkSqlParser(delegate: ParserInterface) extends ParserInterface { + + private lazy val astBuilder = new FlussSqlAstBuilder(delegate) + + override def parsePlan(sqlText: String): LogicalPlan = { + try { + parse(sqlText)(parser => astBuilder.visitSingleStatement(parser.singleStatement())) + } catch { + case _: ParseException | _: ParseCancellationException => + delegate.parsePlan(sqlText) + } + } + + override def parseQuery(sqlText: String): LogicalPlan = parsePlan(sqlText) + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + private def parse[T](sqlText: String)( + toResult: org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser => T): T = { + val lexer = new FlussSparkSqlParserLexer( + new UpperCaseCharStream(CharStreams.fromString(sqlText))) + lexer.removeErrorListeners() + lexer.addErrorListener(FlussParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = + new org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser(tokenStream) + parser.removeErrorListeners() + parser.addErrorListener(FlussParseErrorListener) + + try { + try { + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } catch { + case _: ParseCancellationException => + tokenStream.seek(0) + parser.reset() + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } catch { + case e: ParseException if e.command.isDefined => + throw e + case e: ParseException => + throw e.withCommand(sqlText) + case e: AnalysisException => + val position = org.apache.spark.sql.catalyst.trees.Origin(e.line, e.startPosition) + throw new ParseException(Option(sqlText), e.message, position, position) + } + } +} + +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume() + override def getSourceName: String = wrapped.getSourceName + override def index(): Int = wrapped.index() + override def mark(): Int = wrapped.mark() + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size() + + override def getText(interval: Interval): String = { + wrapped.getText(interval) + } + + override def LA(i: Int): Int = { + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } +} + +object FlussParseErrorListener extends BaseErrorListener { + override def syntaxError( + recognizer: Recognizer[_, _], + offendingSymbol: scala.Any, + line: Int, + charPositionInLine: Int, + msg: String, + e: RecognitionException): Unit = { + val position = org.apache.spark.sql.catalyst.trees.Origin(Some(line), Some(charPositionInLine)) + throw new ParseException(None, msg, position, position) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala new file mode 100644 index 000000000..053c23ea0 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/FlussSqlAstBuilder.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import org.apache.fluss.spark.catalyst.plans.logical._ + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.misc.Interval +import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.FlussSparkSqlParserParser._ +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical._ + +import scala.collection.JavaConverters._ + +/** + * The AST Builder for Fluss SQL extensions. + * + * @param delegate + * The main Spark SQL parser. + */ +class FlussSqlAstBuilder(delegate: ParserInterface) + extends FlussSparkSqlParserBaseVisitor[AnyRef] + with Logging { + + /** Creates a single statement of extension statements. */ + override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { + visit(ctx.statement).asInstanceOf[LogicalPlan] + } + + /** Creates a [[FlussCallStatement]] for a stored procedure call. */ + override def visitCall(ctx: CallContext): FlussCallStatement = withOrigin(ctx) { + val name = + ctx.multipartIdentifier.parts.asScala.map(part => cleanIdentifier(part.getText)).toSeq + val args = ctx.callArgument.asScala.map(typedVisit[FlussCallArgument]).toSeq + FlussCallStatement(name, args) + } + + /** Creates a positional argument in a stored procedure call. */ + override def visitPositionalArgument(ctx: PositionalArgumentContext): FlussCallArgument = + withOrigin(ctx) { + val expression = typedVisit[Expression](ctx.expression) + FlussPositionalArgument(expression) + } + + /** Creates a named argument in a stored procedure call. */ + override def visitNamedArgument(ctx: NamedArgumentContext): FlussCallArgument = withOrigin(ctx) { + val name = cleanIdentifier(ctx.identifier.getText) + val expression = typedVisit[Expression](ctx.expression) + FlussNamedArgument(name, expression) + } + + /** Creates a [[Expression]] in a positional and named argument. */ + override def visitExpression(ctx: ExpressionContext): Expression = { + val sqlString = reconstructSqlString(ctx) + delegate.parseExpression(sqlString) + } + + /** Returns a multi-part identifier as Seq[String]. */ + override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] = + withOrigin(ctx) { + ctx.parts.asScala.map(part => cleanIdentifier(part.getText)).toSeq + } + + /** Remove backticks from identifier. */ + private def cleanIdentifier(ident: String): String = { + if (ident.startsWith("`") && ident.endsWith("`")) { + ident.substring(1, ident.length - 1) + } else { + ident + } + } + + private def reconstructSqlString(ctx: ParserRuleContext): String = { + ctx.children.asScala + .map { + case c: ParserRuleContext => reconstructSqlString(c) + case t: TerminalNode => t.getText + } + .mkString(" ") + } + + private def typedVisit[T](ctx: ParseTree): T = + ctx.accept(this).asInstanceOf[T] + + private def withOrigin[T](ctx: ParserRuleContext)(f: => T): T = { + val current = CurrentOrigin.get + CurrentOrigin.set(position(ctx.getStart)) + try { + f + } finally { + CurrentOrigin.set(current) + } + } + + private def position(token: Token): Origin = { + val opt = Option(token) + Origin(opt.map(_.getLine), opt.map(_.getCharPositionInLine)) + } +} + +case class Origin( + line: Option[Int] = None, + startPosition: Option[Int] = None, + startIndex: Option[Int] = None, + stopIndex: Option[Int] = None, + sqlText: Option[String] = None, + objectType: Option[String] = None, + objectName: Option[String] = None) + +object CurrentOrigin { + private val value = new ThreadLocal[Origin]() { + override def initialValue: Origin = Origin() + } + + def get: Origin = value.get() + def set(o: Origin): Unit = value.set(o) + def reset(): Unit = value.set(Origin()) + + def withOrigin[A](o: Origin)(f: => A): A = { + val previous = get + set(o) + val ret = + try f + finally { set(previous) } + ret + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 688ae872c..11287fb7d 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -53,6 +53,7 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { .set(s"spark.sql.catalog.$DEFAULT_CATALOG", classOf[SparkCatalog].getName) .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", flussServer.getBootstrapServers) .set("spark.sql.defaultCatalog", DEFAULT_CATALOG) + .set("spark.sql.extensions", classOf[FlussSparkSessionExtensions].getName) } override protected def beforeAll(): Unit = { diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala new file mode 100644 index 000000000..36024fd65 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.extensions + +import org.apache.fluss.spark.FlussSparkSessionExtensions +import org.apache.fluss.spark.catalyst.plans.logical.{FlussCallArgument, FlussCallStatement, FlussNamedArgument, FlussPositionalArgument} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.types.DataTypes +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +import java.math.BigDecimal +import java.sql.Timestamp +import java.time.Instant + +class CallStatementParserTest extends FunSuite with BeforeAndAfterEach { + + private var spark: SparkSession = _ + private var parser: ParserInterface = _ + + override def beforeEach(): Unit = { + super.beforeEach() + val optionalSession = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession) + optionalSession.foreach(_.stop()) + SparkSession.clearActiveSession() + + spark = SparkSession + .builder() + .master("local[2]") + .config("spark.sql.extensions", classOf[FlussSparkSessionExtensions].getName) + .getOrCreate() + + parser = spark.sessionState.sqlParser + } + + override def afterEach(): Unit = { + if (spark != null) { + spark.stop() + spark = null + parser = null + } + super.afterEach() + } + + test("testCallWithBackticks") { + val call = + parser.parsePlan("CALL cat.`system`.`no_args_func`()").asInstanceOf[FlussCallStatement] + assert(call.name.toList == List("cat", "system", "no_args_func")) + assert(call.args.size == 0) + } + + test("testCallWithNamedArguments") { + val callStatement = parser + .parsePlan("CALL catalog.system.named_args_func(arg1 => 1, arg2 => 'test', arg3 => true)") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("catalog", "system", "named_args_func")) + assert(callStatement.args.size == 3) + assertArgument(callStatement, 0, Some("arg1"), 1, DataTypes.IntegerType) + assertArgument(callStatement, 1, Some("arg2"), "test", DataTypes.StringType) + assertArgument(callStatement, 2, Some("arg3"), true, DataTypes.BooleanType) + } + + test("testCallWithPositionalArguments") { + val callStatement = parser + .parsePlan( + "CALL catalog.system.positional_args_func(1, '${spark.sql.extensions}', 2L, true, 3.0D, 4.0e1, 500e-1BD, TIMESTAMP '2017-02-03T10:37:30.00Z')") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("catalog", "system", "positional_args_func")) + assert(callStatement.args.size == 8) + assertArgument(callStatement, 0, None, 1, DataTypes.IntegerType) + assertArgument( + callStatement, + 1, + None, + classOf[FlussSparkSessionExtensions].getName, + DataTypes.StringType) + assertArgument(callStatement, 2, None, 2L, DataTypes.LongType) + assertArgument(callStatement, 3, None, true, DataTypes.BooleanType) + assertArgument(callStatement, 4, None, 3.0, DataTypes.DoubleType) + assertArgument(callStatement, 5, None, 4.0e1, DataTypes.DoubleType) + assertArgument( + callStatement, + 6, + None, + new BigDecimal("500e-1"), + DataTypes.createDecimalType(3, 1)) + assertArgument( + callStatement, + 7, + None, + Timestamp.from(Instant.parse("2017-02-03T10:37:30.00Z")), + DataTypes.TimestampType) + } + + test("testCallWithMixedArguments") { + val callStatement = parser + .parsePlan("CALL catalog.system.mixed_args_func(arg1 => 1, 'test')") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("catalog", "system", "mixed_args_func")) + assert(callStatement.args.size == 2) + assertArgument(callStatement, 0, Some("arg1"), 1, DataTypes.IntegerType) + assertArgument(callStatement, 1, None, "test", DataTypes.StringType) + } + + test("testCallSimpleProcedure") { + val callStatement = parser + .parsePlan("CALL system.simple_procedure(table => 'db.table')") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("system", "simple_procedure")) + assert(callStatement.args.size == 1) + assertArgument(callStatement, 0, Some("table"), "db.table", DataTypes.StringType) + } + + private def assertArgument( + callStatement: FlussCallStatement, + index: Int, + expectedName: Option[String], + expectedValue: Any, + expectedType: org.apache.spark.sql.types.DataType): Unit = { + + val callArgument = callStatement.args(index) + + expectedName match { + case None => + assert(callArgument.isInstanceOf[FlussPositionalArgument]) + case Some(name) => + val namedArgument = callArgument.asInstanceOf[FlussNamedArgument] + assert(namedArgument.name == name) + } + + assert(callStatement.args(index).expr == Literal.create(expectedValue, expectedType)) + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala new file mode 100644 index 000000000..a2b0c4396 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/procedure/GetClusterConfigsProcedureTest.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.config.ConfigOptions +import org.apache.fluss.config.cluster.{AlterConfig, AlterConfigOpType} +import org.apache.fluss.spark.FlussSparkTestBase + +import scala.collection.JavaConverters._ + +class GetClusterConfigsProcedureTest extends FlussSparkTestBase { + + test("get_cluster_configs: get all configurations") { + val result = sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs()").collect() + + assert(result.length > 0) + + val firstRow = result.head + assert(firstRow.length == 3) + assert( + firstRow.schema.fieldNames.sameElements(Array("config_key", "config_value", "config_source"))) + + result.foreach { + row => + assert(row.getString(0) != null) + assert(row.getString(1) != null) + assert(row.getString(2) != null) + } + } + + test("get_cluster_configs: get specific configuration") { + val testKey = ConfigOptions.KV_SNAPSHOT_INTERVAL.key() + + val result = sql( + s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$testKey'))").collect() + + assert(result.length == 1) + val row = result.head + assert(row.getString(0) == testKey) + assert(row.getString(1) != null) + assert(row.getString(2) != null) + } + + test("get_cluster_configs: get multiple configurations") { + val key1 = ConfigOptions.KV_SNAPSHOT_INTERVAL.key() + val key2 = ConfigOptions.REMOTE_DATA_DIR.key() + + val result = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$key1', '$key2'))") + .collect() + + assert(result.length == 2) + + val keys = result.map(_.getString(0)).toSet + assert(keys.contains(key1)) + assert(keys.contains(key2)) + + result.foreach { + row => + assert(row.getString(1) != null) + assert(row.getString(2) != null) + } + } + + test("get_cluster_configs: get non-existent configuration") { + val nonExistentKey = "non.existent.config.key" + + val result = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$nonExistentKey'))") + .collect() + + assert(result.length == 0) + } + + test("get_cluster_configs: mixed existent and non-existent configurations") { + val existentKey = ConfigOptions.KV_SNAPSHOT_INTERVAL.key() + val nonExistentKey = "non.existent.config.key" + + val result = sql( + s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$existentKey', '$nonExistentKey'))") + .collect() + + assert(result.length == 1) + assert(result.head.getString(0) == existentKey) + } + + test("get_cluster_configs: verify configuration source") { + val testKey = ConfigOptions.KV_SNAPSHOT_INTERVAL.key() + + val result = sql( + s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array('$testKey'))").collect() + + assert(result.length == 1) + val row = result.head + val source = row.getString(2) + + assert(source == "DYNAMIC" || source == "STATIC" || source == "DEFAULT") + } + + test("get_cluster_configs: empty array parameter should return all configs") { + val result = + sql(s"CALL $DEFAULT_CATALOG.sys.get_cluster_configs(config_keys => array())").collect() + + assert(result.length > 0) + } +} diff --git a/website/blog/2024-11-29-fluss-open-source.md b/website/blog/2024-11-29-fluss-open-source.md index 3e3effb41..42e3df116 100644 --- a/website/blog/2024-11-29-fluss-open-source.md +++ b/website/blog/2024-11-29-fluss-open-source.md @@ -40,7 +40,7 @@ Make sure to keep an eye on the project, give it a try and if you like it, don ### Getting Started - Visit the [GitHub repository](https://github.com/apache/fluss). -- Check out the [quickstart guide](/docs/quickstart/flink/). +- Check out the [quickstart guide](/docs/next/quickstart/flink/). ### Additional Resources - Announcement Blog Post: [Introducing Fluss: Unified Streaming Storage For Next-Generation Data Analytics](https://www.ververica.com/blog/introducing-fluss) diff --git a/website/blog/2025-06-01-partial-updates.md b/website/blog/2025-06-01-partial-updates.md index 55317235e..29d60b8c8 100644 --- a/website/blog/2025-06-01-partial-updates.md +++ b/website/blog/2025-06-01-partial-updates.md @@ -265,7 +265,7 @@ Flink SQL> SELECT * FROM user_rec_wide; Now let's switch to `batch` mode and query the current snapshot of the `user_rec_wide` table. -But before that, let's start the [Tiering Service](/docs/maintenance/tiered-storage/lakehouse-storage/#start-the-datalake-tiering-service) that allows offloading the tables as `Lakehouse` tables. +But before that, let's start the [Tiering Service](/docs/next/maintenance/tiered-storage/lakehouse-storage/#start-the-datalake-tiering-service) that allows offloading the tables as `Lakehouse` tables. **Step 7:** Open a new terminal 💻 in the `Coordinator Server` and run the following command to start the `Tiering Service`: ```shell @@ -297,7 +297,7 @@ Flink SQL> SELECT * FROM user_rec_wide; ### Conclusion Partial updates in Fluss enable an alternative approach in how we design streaming data pipelines for enriching or joining data. -When all your sources share a primary key - otherwise you can mix & match [streaming lookup joins](/docs/engine-flink/lookups/#lookup) - you can turn the problem on its head: update a unified table incrementally, rather than joining streams on the fly. +When all your sources share a primary key - otherwise you can mix & match [streaming lookup joins](/docs/next/engine-flink/lookups/#lookup) - you can turn the problem on its head: update a unified table incrementally, rather than joining streams on the fly. The result is a more scalable, maintainable, and efficient pipeline. Engineers can spend less time wrestling with Flink’s state, checkpoints and join mechanics, and more time delivering fresh, integrated data to power real-time analytics and applications. diff --git a/website/blog/releases/0.6.md b/website/blog/releases/0.6.md index 287b5cb37..46b1a5624 100644 --- a/website/blog/releases/0.6.md +++ b/website/blog/releases/0.6.md @@ -184,7 +184,7 @@ SELECT * FROM fluss_left_table INNER JOIN fluss_right_table Flink performs lookups on Fluss tables using the Join Key, which serves as the Bucket Key for the Fluss table. This allows it to leverage the prefix index of the primary key in the Fluss table, enabling highly efficient lookup queries. This feature in Fluss is referred to as Prefix Lookup. Currently, Prefix Lookup can also be used to perform one-to-many lookup queries. -For more details, please refer to the [Prefix Lookup](/docs/engine-flink/lookups/#prefix-lookup) documentation. +For more details, please refer to the [Prefix Lookup](/docs/next/engine-flink/lookups/#prefix-lookup) documentation. ## Stability & Performance Improvements diff --git a/website/blog/releases/0.7.md b/website/blog/releases/0.7.md index a8ebf1bc0..74b3ba169 100644 --- a/website/blog/releases/0.7.md +++ b/website/blog/releases/0.7.md @@ -79,7 +79,7 @@ flink run /path/to/fluss-flink-tiering-0.7.0.jar \ --datalake.paimon.warehouse /path/to/warehouse ``` -See more details in the [Streaming Lakehouse documentation](/docs/maintenance/tiered-storage/lakehouse-storage/). +See more details in the [Streaming Lakehouse documentation](/docs/next/maintenance/tiered-storage/lakehouse-storage/). ## Streaming Partition Pruning Partitioning is a foundational technique in modern data warehouses and Lakehouse architectures for optimizing query performance by @@ -131,7 +131,7 @@ CALL admin_catalog.sys.add_acl( ); ``` -For details, please refer to the [Security documentation](/docs/security/overview/) and quickstarts. +For details, please refer to the [Security documentation](/docs/next/security/overview/) and quickstarts. ## Flink DataStream Connector Fluss 0.7 officially introduces the DataStream Connector, supporting both Source and Sink for reading and writing log and primary key tables. Users can now seamlessly integrate Fluss tables into Flink DataStream pipelines. @@ -155,7 +155,7 @@ DataStreamSource<Order> stream = env.fromSource( ); ``` -For usage examples and configuration parameters, see the [DataStream Connector documentation](/docs/engine-flink/datastream/). +For usage examples and configuration parameters, see the [DataStream Connector documentation](/docs/next/engine-flink/datastream/). ## Fluss Java Client @@ -164,7 +164,7 @@ In this version, we officially release the Fluss Java Client, a client library d * **Table API:** For table-based data operations, supporting streaming reads/writes, updates, deletions, and point queries. * **Admin API:** For metadata management, including cluster management, table lifecycle, and access control. -The client supports forward and backward compatibility, ensuring smooth upgrades across Fluss versions. With the Fluss Java Client, developers can build online applications and data ingestion services based on Fluss, as well as enterprise-level components such as Fluss management platforms and operations monitoring systems. For detailed usage instructions, please refer to the official documentation: [Fluss Java Client User Guide](/docs/apis/java-client/). +The client supports forward and backward compatibility, ensuring smooth upgrades across Fluss versions. With the Fluss Java Client, developers can build online applications and data ingestion services based on Fluss, as well as enterprise-level components such as Fluss management platforms and operations monitoring systems. For detailed usage instructions, please refer to the official documentation: [Fluss Java Client User Guide](/docs/next/apis/java-client/). Fluss uses Apache Arrow as its underlying storage format, enabling efficient cross-language extensions. A **Fluss Python Client** is planned for future releases, leveraging the rich ecosystem of **PyArrow** to integrate with popular data analysis tools such as **Pandas** and **DuckDB**. This will further lower the barrier for real-time data exploration and analytics. diff --git a/website/blog/releases/0.8.md b/website/blog/releases/0.8.md index c302685af..963ab6749 100644 --- a/website/blog/releases/0.8.md +++ b/website/blog/releases/0.8.md @@ -51,7 +51,7 @@ datalake.iceberg.type: hadoop datalake.iceberg.warehouse: /path/to/iceberg ``` -You can find more detailed instructions in the [Iceberg Lakehouse documentation](/docs/streaming-lakehouse/integrate-data-lakes/iceberg/). +You can find more detailed instructions in the [Iceberg Lakehouse documentation](/docs/next/streaming-lakehouse/integrate-data-lakes/iceberg/). ## Real-Time Multimodal AI Analytics with Lance @@ -80,7 +80,7 @@ datalake.lance.access_key_id: <access_key_id> datalake.lance.secret_access_key: <secret_access_key> ``` -See the [LanceDB blog post](https://lancedb.com/blog/fluss-integration/) for the full integration. You also can find more detailed instructions in the [Lance Lakehouse documentation](/docs/streaming-lakehouse/integrate-data-lakes/lance/). +See the [LanceDB blog post](https://lancedb.com/blog/fluss-integration/) for the full integration. You also can find more detailed instructions in the [Lance Lakehouse documentation](/docs/next/streaming-lakehouse/integrate-data-lakes/lance/). ## Flink 2.1 @@ -102,7 +102,7 @@ Below is a performance comparison (CPU, memory, state size, checkpoint interval)  -You can find more detailed instructions in the [Delta Join documentation](/docs/engine-flink/delta-joins/). +You can find more detailed instructions in the [Delta Join documentation](/docs/next/engine-flink/delta-joins/). ### Materialized Table @@ -135,7 +135,7 @@ WITH( ); ``` -You can find more detailed instructions in the [Materialized Table documentation](/docs/engine-flink/ddl/#materialized-table). +You can find more detailed instructions in the [Materialized Table documentation](/docs/next/engine-flink/ddl/#materialized-table). ## Stability @@ -144,7 +144,7 @@ Through continuous validation across multiple business units within Alibaba Grou These improvements substantially enhance Fluss’s robustness in mission-critical streaming use cases. Key improvements include: -- **[Graceful Shutdown](/docs/maintenance/operations/graceful-shutdown/)**: Fluss supports cluster rolling upgrade, and we introduced a graceful shutdown mechanism for TabletServers in this version. During shutdown, leadership is proactively migrated before termination, ensuring that read/write latency remains unaffected during rolling upgrades. +- **[Graceful Shutdown](/docs/next/maintenance/operations/graceful-shutdown/)**: Fluss supports cluster rolling upgrade, and we introduced a graceful shutdown mechanism for TabletServers in this version. During shutdown, leadership is proactively migrated before termination, ensuring that read/write latency remains unaffected during rolling upgrades. - **Accelerated Coordinator Event Processing**: Optimized the Coordinator’s event handling mechanism through asynchronous processing and batched ZooKeeper operations. As a result, all events are now processed in milliseconds. - **Faster Coordinator Recovery**: Parallelized initialization cuts Coordinator startup time from 10 minutes to just 20 seconds in production-scale benchmarks, this dramatically improves service availability and recovery speed. - **Optimized Server Metrics**: Refined metric granularity and reporting logic to reduce telemetry volume by 90% while preserving full observability. @@ -180,7 +180,7 @@ When you issue a `ALTER TABLE ... SET` command to update storage options on a ta This capability is especially useful for tuning performance, adapting to changing data patterns, or complying with evolving data governance requirements—all without service interruption. -You can find more detailed instructions in the [Updating Configs documentation](/docs/maintenance/operations/updating-configs/). +You can find more detailed instructions in the [Updating Configs documentation](/docs/next/maintenance/operations/updating-configs/). ## Helm Charts @@ -188,7 +188,7 @@ This release also introduced Helm Charts. With this addition, users can now depl The Helm chart simplifies provisioning, upgrades, and scaling by packaging configuration, manifests, and dependencies into a single, versioned release. This should help users running Fluss on Kubernetes faster, more reliably, and with easier integration into existing CI/CD and observability setups, significantly lowering the barrier for teams adopting Fluss in production. -You can find more detailed instructions in the [Deploying with Helm documentation](/docs/install-deploy/deploying-with-helm/). +You can find more detailed instructions in the [Deploying with Helm documentation](/docs/next/install-deploy/deploying-with-helm/). ## Java Version Upgrade @@ -214,7 +214,7 @@ The Fluss community is committed to delivering a smooth upgrade experience. This - Clients from version 0.7 can seamlessly connect to version 0.8 servers, - Clients from version 0.8 are also compatible with version 0.7 servers. -However, Fluss 0.8 is the first official release since the project entered the Apache Incubator, and it includes changes such as package path updates (e.g., groupId and Java package names). As a result, applications that depend on the Fluss SDK will need to make corresponding code adjustments when upgrading to version 0.8. Please refer to the [upgrade notes](/docs/maintenance/operations/upgrade-notes-0.8/) for a comprehensive list of adjustments to make and issues to check during the upg [...] +However, Fluss 0.8 is the first official release since the project entered the Apache Incubator, and it includes changes such as package path updates (e.g., groupId and Java package names). As a result, applications that depend on the Fluss SDK will need to make corresponding code adjustments when upgrading to version 0.8. Please refer to the [upgrade notes](/docs/next/maintenance/operations/upgrade-notes-0.8/) for a comprehensive list of adjustments to make and issues to check during th [...] For a detailed list of all changes in this release, please refer to the [release notes](https://github.com/apache/fluss/releases/tag/v0.8.0-incubating). diff --git a/website/docs/engine-spark/_category_.json b/website/docs/engine-spark/_category_.json new file mode 100644 index 000000000..e81d6a72f --- /dev/null +++ b/website/docs/engine-spark/_category_.json @@ -0,0 +1,4 @@ +{ + "label": "Engine Spark", + "position": 7 +} diff --git a/website/docs/engine-spark/procedures.md b/website/docs/engine-spark/procedures.md new file mode 100644 index 000000000..0d8b01047 --- /dev/null +++ b/website/docs/engine-spark/procedures.md @@ -0,0 +1,107 @@ +--- +sidebar_label: Procedures +title: Procedures +sidebar_position: 3 +--- + +# Procedures + +Fluss provides stored procedures to perform administrative and management operations through Spark SQL. All procedures are located in the `sys` namespace and can be invoked using the `CALL` statement. + +## Configuration + +To enable Fluss procedures in Spark, you need to configure the Spark session extensions: + +```scala +spark.conf.set("spark.sql.extensions", "org.apache.fluss.spark.FlussSparkSessionExtensions") +``` + +Or in `spark-defaults.conf`: + +```properties +spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions +``` + +## Syntax + +The general syntax for calling a procedure is: + +```sql +CALL [catalog_name.]sys.procedure_name( + parameter_name => 'value', + another_parameter => 'value' +) +``` + +### Argument Passing + +Procedures support two ways to pass arguments: + +1. **Named Arguments** (recommended): + ```sql + CALL catalog.sys.procedure_name(parameter => 'value') + ``` + +2. **Positional Arguments**: + ```sql + CALL catalog.sys.procedure_name('value') + ``` + +Note: You cannot mix named and positional arguments in a single procedure call. + +## Cluster Configuration Procedures + +Fluss provides procedures to dynamically manage cluster configurations without requiring a server restart. + +### get_cluster_configs + +Retrieve cluster configuration values. + +**Syntax:** + +```sql +CALL [catalog_name.]sys.get_cluster_configs() +CALL [catalog_name.]sys.get_cluster_configs(config_keys => ARRAY('key1', 'key2')) +``` + +**Parameters:** + +- `config_keys` (optional): Array of configuration keys to retrieve. If omitted, returns all cluster configurations. + +**Returns:** A table with columns: + +- `config_key`: The configuration key name +- `config_value`: The current value +- `config_source`: The source of the configuration (e.g., `DEFAULT`, `DYNAMIC`, `STATIC`) + +**Example:** + +```sql title="Spark SQL" +-- Use the Fluss catalog (replace 'fluss_catalog' with your catalog name if different) +USE fluss_catalog; + +-- Get all cluster configurations +CALL sys.get_cluster_configs(); + +-- Get specific configurations +CALL sys.get_cluster_configs(config_keys => ARRAY('kv.rocksdb.shared-rate-limiter.bytes-per-sec', 'datalake.format')); +``` + +## Error Handling + +Procedures will throw exceptions in the following cases: + +- **Missing Required Parameters**: If a required parameter is not provided +- **Invalid Procedure Name**: If the specified procedure does not exist +- **Type Mismatch**: If a parameter value cannot be converted to the expected type +- **Permission Denied**: If the user does not have permission to perform the operation + +## Implementation Notes + +- Procedures are executed synchronously and return results immediately +- The `sys` namespace is reserved for system procedures +- Custom procedures can be added by implementing the `Procedure` interface + +## See Also + +- [Flink Procedures](../../engine-flink/procedures) diff --git a/website/docs/maintenance/operations/rebalance.md b/website/docs/maintenance/operations/rebalance.md index 48d89ac40..eafb19827 100644 --- a/website/docs/maintenance/operations/rebalance.md +++ b/website/docs/maintenance/operations/rebalance.md @@ -210,7 +210,7 @@ public class RebalanceExample { ## Using Flink Stored Procedures -For rebalancing operations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Rebalance Procedures](/docs/engine-flink/procedures.md#rebalance-procedures) for detailed documentation on using the following procedures: +For rebalancing operations, Fluss provides convenient Flink stored procedures that can be called directly from Flink SQL. See [Rebalance Procedures](../../../engine-flink/procedures#rebalance-procedures) for detailed documentation on using the following procedures: - **add_server_tag**: Tag servers before rebalancing - **remove_server_tag**: Remove tags after rebalancing diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md b/website/docs/maintenance/tiered-storage/lakehouse-storage.md index 35b2394d7..89237c31c 100644 --- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md +++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md @@ -51,7 +51,7 @@ For example: - If you are using Paimon filesystem catalog with OSS filesystem, you need to put `paimon-oss-<paimon_version>.jar` into directory `${FLUSS_HOME}/plugins/paimon/`. - If you are using Paimon Hive catalog, you need to put [the flink sql hive connector jar](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) into directory `${FLUSS_HOME}/plugins/paimon/`. -Additionally, when using Paimon with HDFS, you must also configure the Fluss server with the Hadoop environment. See the [HDFS setup guide](/docs/maintenance/filesystems/hdfs.md) for detailed instructions. +Additionally, when using Paimon with HDFS, you must also configure the Fluss server with the Hadoop environment. See the [HDFS setup guide](../../filesystems/hdfs) for detailed instructions. ### Start The Datalake Tiering Service Then, you must start the datalake tiering service to tier Fluss's data to the lakehouse storage. diff --git a/website/src/pages/index.tsx b/website/src/pages/index.tsx index 06aa2fcfc..5cf202b53 100644 --- a/website/src/pages/index.tsx +++ b/website/src/pages/index.tsx @@ -38,7 +38,7 @@ function HomepageHeader() { <div className={styles.buttons}> <Link className={clsx("hero_button button button--primary button--lg", styles.buttonWidth)} - to="/docs/quickstart/flink"> + to="/docs/next/quickstart/flink"> Quick Start </Link>
