This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5141-7a6c22a09b0efd60cee35e106dbbc66de2a73a55 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 7deed35fddf58bc910991af920f0cf4d16c46f9b Author: carloea2 <[email protected]> AuthorDate: Thu Jun 4 11:49:59 2026 -0700 feat(workflow-operator): add Python UDF UI parameter injection model (#5141) ### What changes were proposed in this PR? This PR adds the Scala backend foundation for Python UDF UI parameters. It introduces: | Area | Change | | --- | --- | | UI parameter model | Adds `UiUDFParameter`, containing backend-compatible `attribute` metadata and an editable `value`. | | Python UDF injector | Adds `PythonUdfUiParameterInjector`, which validates UI parameters and injects a reserved hook method into supported Python UDF classes. | | Safe string encoding | Marks `Attribute.getName()` as encodable so UI parameter names are safely rendered through the Python template builder. | | Test coverage | Adds Scala tests for hook injection, validation, unsupported types, reserved method conflicts, and unchanged behavior when no UI parameters exist. | This PR is stacked after the merged frontend foundation PR #5043. It does not yet wire the injector into operator execution; that wiring is handled by later PRs in the stack. Existing Python UDF workflow execution remains unchanged in this PR because `PythonUDFOpDescV2`, `PythonUDFSourceOpDescV2`, and `DualInputPortsPythonUDFOpDescV2` are not modified here. ### Any related issues, documentation, discussions? Part of the Python UDF UI parameter feature split from `feat/ui-parameter`. Related tracking issue / stack: #5044 Stack order: 1. Frontend UI parameter building blocks: #5043 2. Scala backend injection model: this PR 3. Python runtime support 4. End-to-end integration ### How was this PR tested? Commands run: ```bash sbt "WorkflowOperator / Test / testOnly org.apache.texera.amber.operator.udf.python.PythonUdfUiParameterInjectorSpec" sbt scalafmtAll sbt scalafmtCheckAll "scalafixAll --check" ``` Results: - `PythonUdfUiParameterInjectorSpec`: 10 tests passed. - `scalafmtAll`: no file changes. - `scalafmtCheckAll` and `scalafixAll --check`: passed. ### Was this PR authored or co-authored using generative AI tooling? No --------- Co-authored-by: Xiaozhen Liu <[email protected]> --- .../texera/amber/pybuilder/PythonLexerUtils.scala | 62 +++++ .../amber/pybuilder/PythonLexerUtilsSpec.scala | 31 +++ .../apache/texera/amber/core/tuple/Attribute.java | 2 + .../udf/python/PythonUdfUiParameterInjector.scala | 205 ++++++++++++++++ .../amber/operator/udf/python/UiUDFParameter.scala | 47 ++++ .../python/PythonUdfUiParameterInjectorSpec.scala | 262 +++++++++++++++++++++ 6 files changed, 609 insertions(+) diff --git a/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala b/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala index 08aac3a9e8..75442f759d 100644 --- a/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala +++ b/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala @@ -37,6 +37,68 @@ object PythonLexerUtils { if (lastNewlineIndex >= 0) s.substring(lastNewlineIndex + 1) else s } + /** + * Update triple-quoted-string state after scanning one physical Python source line. + * + * This is intentionally lightweight. It only tracks whether scanning is inside a `'''` or `"""` string so callers + * that reason about indentation can avoid treating string contents as real Python statements. + * + * Known limitations: escaped delimiters inside an active triple-quoted string are still treated as closing + * delimiters, and delimiter-like runs next to ordinary string boundaries may be detected because this helper does + * not fully parse Python string literal adjacency. + */ + def updateTripleQuotedStringState( + line: String, + activeDelimiter: Option[String] + ): Option[String] = { + var delimiter = activeDelimiter + var inSingleQuotedString = false + var inDoubleQuotedString = false + var escaped = false + var index = 0 + + while (index < line.length) { + delimiter match { + case Some(active) => + if (line.startsWith(active, index)) { + delimiter = None + index += active.length + } else { + index += 1 + } + + case None => + val char = line.charAt(index) + + if (escaped) { + escaped = false + index += 1 + } else if ((inSingleQuotedString || inDoubleQuotedString) && char == '\\') { + escaped = true + index += 1 + } else if (!inSingleQuotedString && !inDoubleQuotedString && char == '#') { + return delimiter + } else if (!inDoubleQuotedString && line.startsWith("'''", index)) { + delimiter = Some("'''") + index += 3 + } else if (!inSingleQuotedString && line.startsWith("\"\"\"", index)) { + delimiter = Some("\"\"\"") + index += 3 + } else if (!inDoubleQuotedString && char == '\'') { + inSingleQuotedString = !inSingleQuotedString + index += 1 + } else if (!inSingleQuotedString && char == '"') { + inDoubleQuotedString = !inDoubleQuotedString + index += 1 + } else { + index += 1 + } + } + } + + delimiter + } + /** * Detect whether the provided line tail contains an unclosed single or double quote. * diff --git a/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala b/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala index ea473969e7..0939d2c4a6 100644 --- a/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala +++ b/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala @@ -100,6 +100,37 @@ class PythonLexerUtilsSpec extends AnyFunSuite { assert(PythonLexerUtils.lineTail(text) == "") } + // -------- updateTripleQuotedStringState -------- + + test("updateTripleQuotedStringState: enters and exits triple single quoted strings") { + val opened = PythonLexerUtils.updateTripleQuotedStringState("sql = '''", None) + assert(opened.contains("'''")) + + val stillOpen = PythonLexerUtils.updateTripleQuotedStringState("SELECT * FROM t", opened) + assert(stillOpen.contains("'''")) + + val closed = PythonLexerUtils.updateTripleQuotedStringState("'''", stillOpen) + assert(closed.isEmpty) + } + + test("updateTripleQuotedStringState: enters and exits triple double quoted strings") { + val opened = PythonLexerUtils.updateTripleQuotedStringState("sql = \"\"\"", None) + assert(opened.contains("\"\"\"")) + + val closed = PythonLexerUtils.updateTripleQuotedStringState("\"\"\"", opened) + assert(closed.isEmpty) + } + + test("updateTripleQuotedStringState: ignores triple quotes in single-line comments") { + val state = PythonLexerUtils.updateTripleQuotedStringState("# ''' not a string", None) + assert(state.isEmpty) + } + + test("updateTripleQuotedStringState: ignores triple quotes inside ordinary strings") { + val state = PythonLexerUtils.updateTripleQuotedStringState("value = \"'''\"", None) + assert(state.isEmpty) + } + // -------- hasUnclosedQuote -------- test("hasUnclosedQuote: empty string has no unclosed quote") { diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java index 84d52fddce..fb434e0875 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java @@ -21,6 +21,7 @@ package org.apache.texera.amber.core.tuple; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.texera.amber.pybuilder.EncodableStringAnnotation; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -49,6 +50,7 @@ public class Attribute implements Serializable { @JsonProperty(value = "attributeName", required = true) @NotBlank(message = "Attribute name is required") + @EncodableStringAnnotation public String getName() { return attributeName; } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala new file mode 100644 index 0000000000..13c25a436e --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.apache.texera.amber.pybuilder.PythonLexerUtils.updateTripleQuotedStringState +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext + +import scala.util.matching.Regex + +/** + * Injects the reserved UI-parameter hook into user-written Python UDF code. + * + * Operator descriptors should call this after loading saved [[UiUDFParameter]] values and before sending Python source + * to runtime execution. The injected hook returns decoded parameter names and values that Python runtime support reads + * before the user's `open()` method runs. + */ +object PythonUdfUiParameterInjector { + + private val InjectedUiParametersHookMethodName = "_texera_injected_ui_parameters" + private val InjectedUiParametersHookMethodHeader = + s"def $InjectedUiParametersHookMethodName(self) -> Dict[str, Any]:" + private val UnsupportedUiParameterTypes = Set(AttributeType.BINARY, AttributeType.LARGE_BINARY) + + // Keep supported user-facing UDF class names in sync with the frontend parser. + private val SupportedPythonUdfClassHeaderRegex: Regex = + """(?m)^([ \t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r + + private def validate(uiParameters: List[UiUDFParameter]): Unit = { + val attributes = uiParameters.map(parameterAttribute) + attributes.foreach(validateSupportedType) + + attributes + .groupBy(_.getName) + .collectFirst { + case (parameterName, matchingAttributes) if matchingAttributes.size > 1 => parameterName + } + .foreach { duplicateName => + throw new RuntimeException(s"UiParameter name '$duplicateName' is declared more than once.") + } + } + + private def parameterAttribute(parameter: UiUDFParameter): Attribute = + Option(parameter).flatMap(parameter => Option(parameter.attribute)).getOrElse { + throw new RuntimeException("UiParameter attribute is required.") + } + + private def validateSupportedType(attribute: Attribute): Unit = { + if (UnsupportedUiParameterTypes.contains(attribute.getType)) { + throw new RuntimeException( + s"UiParameter type '${attribute.getType.name()}' is not supported. " + + "Use string, integer, long, double, boolean, or timestamp instead." + ) + } + } + + private def buildInjectedParameterEntry(parameter: UiUDFParameter): PythonTemplateBuilder = { + pyb"${parameter.attribute.getName}: ${parameter.value}" + } + + private def buildInjectedParametersMap( + uiParameters: List[UiUDFParameter] + ): PythonTemplateBuilder = { + val entries = uiParameters.map(buildInjectedParameterEntry) + entries.reduceOption((acc, entry) => acc + pyb", " + entry).getOrElse(pyb"") + } + + private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { + val injectedParametersMap = buildInjectedParametersMap(uiParameters) + + (pyb"""|# Follow-up runtime support exports Dict/Any and defines the base hook that @overrides targets. + |@overrides + |$InjectedUiParametersHookMethodHeader + | return {""" + + injectedParametersMap + + pyb"""} + |""").encode + } + + private def indentBlock(block: String, indent: String): String = { + block + .split("\n", -1) + .map { line => + if (line.nonEmpty) indent + line else line + } + .mkString("\n") + } + + private def lineEndIndex(text: String, from: Int): Int = { + val lineEnd = text.indexOf('\n', from) + if (lineEnd < 0) text.length else lineEnd + } + + private def detectClassBlockEnd(code: String, classHeaderStart: Int, classIndent: String): Int = { + val classLineEnd = lineEndIndex(code, classHeaderStart) + var lineStart = if (classLineEnd < code.length) classLineEnd + 1 else code.length + var tripleQuotedStringDelimiter: Option[String] = None + + while (lineStart < code.length) { + val lineEnd = lineEndIndex(code, lineStart) + val line = code.substring(lineStart, lineEnd) + + val trimmed = line.trim + val isBlank = trimmed.isEmpty + + val currentIndentLen = line.segmentLength(ch => ch == ' ' || ch == '\t') + val classIndentLen = classIndent.length + + if (tripleQuotedStringDelimiter.isEmpty && !isBlank && currentIndentLen <= classIndentLen) { + return lineStart + } + + tripleQuotedStringDelimiter = updateTripleQuotedStringState(line, tripleQuotedStringDelimiter) + + lineStart = if (lineEnd < code.length) lineEnd + 1 else code.length + } + + code.length + } + + private def containsReservedHook(classBlock: String): Boolean = { + val hookRegex = + ("""(?m)^[ \t]+def\s+""" + Regex.quote(InjectedUiParametersHookMethodName) + """\s*\(""").r + hookRegex.findFirstIn(classBlock).isDefined + } + + private def injectHookIntoUserClass(userCode: String, hookMethod: String): String = { + val classHeaderMatch = + SupportedPythonUdfClassHeaderRegex.findFirstMatchIn(userCode).getOrElse { + throw new RuntimeException( + "UiParameters were provided, but no supported Python UDF class was found. " + + "Use one of ProcessTupleOperator, ProcessBatchOperator, ProcessTableOperator, or GenerateOperator." + ) + } + + val classHeaderStart = classHeaderMatch.start + val classIndent = classHeaderMatch.group(1) + val classBlockEnd = detectClassBlockEnd(userCode, classHeaderStart, classIndent) + + val classBlock = userCode.substring(classHeaderStart, classBlockEnd) + + if (containsReservedHook(classBlock)) { + throw new RuntimeException( + s"Reserved method '$InjectedUiParametersHookMethodName' is already defined in the UDF class. Please rename your method." + ) + } + + val bodyIndent = inferClassBodyIndent(classBlock, classIndent).getOrElse(classIndent + " ") + val indentedHook = indentBlock( + (if (classBlock.endsWith("\n")) "" else "\n") + hookMethod.trim + "\n", + bodyIndent + ) + + userCode.substring(0, classBlockEnd) + + indentedHook + + userCode.substring(classBlockEnd) + } + + private def inferClassBodyIndent(classBlock: String, classIndent: String): Option[String] = { + val lines = classBlock.split("\n", -1).toList.drop(1) + + lines.collectFirst { + case line if line.trim.nonEmpty => + val leading = line.takeWhile(ch => ch == ' ' || ch == '\t') + if (leading.length > classIndent.length) leading else classIndent + " " + } + } + + /** + * Returns Python code with the UI-parameter hook injected into the supported UDF class. + * + * If `uiParameters` is empty, the code is returned unchanged. Throws [[RuntimeException]] when parameter metadata is + * invalid, the user already defines the reserved hook method, or parameters are provided for an unsupported class. + */ + def inject(code: String, uiParameters: List[UiUDFParameter]): String = { + val parameters = Option(uiParameters).getOrElse(List.empty) + validate(parameters) + + val userCode = Option(code).getOrElse("") + + if (parameters.isEmpty) { + return userCode + } + + val hookMethod = buildInjectedHookMethod(parameters) + injectHookIntoUserClass(userCode, hookMethod) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala new file mode 100644 index 0000000000..b18b9a181d --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.tuple.Attribute +import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString + +import javax.validation.Valid +import javax.validation.constraints.NotNull + +/** + * Serialized operator property for one Python UDF UI parameter. + * + * `attribute` carries the inferred parameter name and type. `value` is user-entered text and is marked as + * [[EncodableString]] so Python code generation decodes it at runtime instead of embedding raw text into generated code. + */ +class UiUDFParameter { + + @JsonProperty(required = true) + @JsonSchemaTitle("Attribute") + @Valid + @NotNull(message = "Attribute is required") + var attribute: Attribute = _ + + @JsonProperty() + @JsonSchemaTitle("Value") + var value: EncodableString = "" +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala new file mode 100644 index 0000000000..d5a2534758 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { + + private def uiParameter( + attributeName: String, + attributeType: AttributeType, + value: String + ): UiUDFParameter = { + val parameter = new UiUDFParameter + parameter.attribute = new Attribute(attributeName, attributeType) + parameter.value = value + parameter + } + + private def inject(parameters: UiUDFParameter*): String = + PythonUdfUiParameterInjector.inject(baseUdfCode, parameters.toList) + + private def inject(code: String, parameters: UiUDFParameter*): String = + PythonUdfUiParameterInjector.inject(code, parameters.toList) + + private def decoderCallCount(code: String): Int = + code.sliding("self.decode_python_template".length).count(_ == "self.decode_python_template") + + private val baseUdfCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + |""".stripMargin + + it should "return user code unchanged when there are no UI parameters" in { + val injectedCode = inject() + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("""print("open")""") + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should not include ("self.decode_python_template") + injectedCode should not include ("import typing") + } + + it should "return unsupported user code unchanged when there are no UI parameters" in { + val nonSupportedCode = + """from pytexera import * + | + |class SomethingElse: + | def open(self): + | pass + |""".stripMargin + + inject(nonSupportedCode) shouldBe nonSupportedCode + } + + it should "preserve user source lines that look like Scala stripMargin input" in { + val udfCodeWithPipeLine = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | def open(self): + | pattern = "keep" + | text = ''' + | |do not strip this line + |''' + | + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + |""".stripMargin + + val injectedCode = inject(udfCodeWithPipeLine, uiParameter("k", AttributeType.STRING, "v")) + + injectedCode should include(" |do not strip this line") + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + } + + it should "inject UI parameter hook into supported UDF class using Dict and Any from pytexera" in { + val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")) + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include( + "# Follow-up runtime support exports Dict/Any and defines the base hook that @overrides targets." + ) + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + injectedCode should include("return {") + injectedCode should include("self.decode_python_template") + decoderCallCount(injectedCode) shouldBe 2 + injectedCode should include("""print("open")""") + injectedCode should not include ("import typing") + injectedCode should not include ("typing.Dict") + injectedCode should not include ("typing.Any") + } + + it should "append the reserved hook inside the class before the next top-level statement" in { + val udfCodeWithSiblingDefinition = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + | + |def helper(): + | return "outside" + |""".stripMargin + + val injectedCode = + inject(udfCodeWithSiblingDefinition, uiParameter("k", AttributeType.STRING, "v")) + + val hookIndex = injectedCode.indexOf("def _texera_injected_ui_parameters(self)") + val processTupleIndex = + injectedCode.indexOf("def process_tuple(self, tuple_: Tuple, port: int):") + val helperIndex = injectedCode.indexOf("def helper():") + + hookIndex should be >= 0 + processTupleIndex should be < hookIndex + helperIndex should be > hookIndex + } + + it should "append the reserved hook after triple-quoted strings that contain top-level-looking lines" in { + val udfCodeWithTripleQuotedString = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | def process_tuple(self, tuple_: Tuple, port: int): + | sql = ''' + |SELECT * FROM t + |''' + | yield tuple_ + | + |def helper(): + | return "outside" + |""".stripMargin + + val injectedCode = + inject(udfCodeWithTripleQuotedString, uiParameter("k", AttributeType.STRING, "v")) + + val hookIndex = injectedCode.indexOf("def _texera_injected_ui_parameters(self)") + val stringEndIndex = injectedCode.indexOf("'''\n yield tuple_") + val helperIndex = injectedCode.indexOf("def helper():") + + stringEndIndex should be >= 0 + stringEndIndex should be < hookIndex + hookIndex should be < helperIndex + } + + it should "preserve multiple UI parameters in the injected map" in { + val injectedCode = inject( + uiParameter("param1", AttributeType.DOUBLE, "12.5"), + uiParameter("param2", AttributeType.INTEGER, "1"), + uiParameter("param3", AttributeType.STRING, "Hola"), + uiParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z") + ) + + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + injectedCode should include("self.decode_python_template") + decoderCallCount(injectedCode) shouldBe 8 + injectedCode should not include ("import typing") + } + + it should "throw when a parameter attribute is missing" in { + val invalidParameter = new UiUDFParameter + invalidParameter.attribute = null + invalidParameter.value = "anything" + + val exception = the[RuntimeException] thrownBy { + inject(invalidParameter) + } + + exception.getMessage should include("UiParameter attribute is required") + } + + it should "throw when a UI parameter name is duplicated" in { + val exception = the[RuntimeException] thrownBy { + inject( + uiParameter("date", AttributeType.STRING, "2024-01-01"), + uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + } + + exception.getMessage should include("UiParameter name 'date' is declared more than once") + } + + Seq(AttributeType.BINARY, AttributeType.LARGE_BINARY).foreach { unsupportedType => + it should s"throw when a UI parameter uses ${unsupportedType.name()} type" in { + val exception = the[RuntimeException] thrownBy { + inject(uiParameter("payload", unsupportedType, "68656c6c6f")) + } + + exception.getMessage should include( + s"UiParameter type '${unsupportedType.name()}' is not supported" + ) + } + } + + it should "throw when the reserved hook is already defined by the user" in { + val udfWithReservedHook = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | def _texera_injected_ui_parameters(self): + | return {} + | + | def open(self): + | pass + |""".stripMargin + + val exception = the[RuntimeException] thrownBy { + inject(udfWithReservedHook, uiParameter("k", AttributeType.STRING, "v")) + } + + exception.getMessage should include( + "Reserved method '_texera_injected_ui_parameters' is already defined" + ) + } + + it should "throw when UI parameters are provided but no supported user class is present" in { + val nonSupportedCode = + """from pytexera import * + | + |class SomethingElse: + | def open(self): + | pass + |""".stripMargin + + val exception = the[RuntimeException] thrownBy { + inject(nonSupportedCode, uiParameter("k", AttributeType.STRING, "v")) + } + + exception.getMessage should include("no supported Python UDF class was found") + } +}
