This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 7deed35fdd feat(workflow-operator): add Python UDF UI parameter
injection model (#5141)
7deed35fdd is described below
commit 7deed35fddf58bc910991af920f0cf4d16c46f9b
Author: carloea2 <[email protected]>
AuthorDate: Thu Jun 4 11:49:59 2026 -0700
feat(workflow-operator): add Python UDF UI parameter injection model (#5141)
### What changes were proposed in this PR?
This PR adds the Scala backend foundation for Python UDF UI parameters.
It introduces:
| Area | Change |
| --- | --- |
| UI parameter model | Adds `UiUDFParameter`, containing
backend-compatible `attribute` metadata and an editable `value`. |
| Python UDF injector | Adds `PythonUdfUiParameterInjector`, which
validates UI parameters and injects a reserved hook method into
supported Python UDF classes. |
| Safe string encoding | Marks `Attribute.getName()` as encodable so UI
parameter names are safely rendered through the Python template builder.
|
| Test coverage | Adds Scala tests for hook injection, validation,
unsupported types, reserved method conflicts, and unchanged behavior
when no UI parameters exist. |
This PR is stacked after the merged frontend foundation PR #5043. It
does not yet wire the injector into operator execution; that wiring is
handled by later PRs in the stack.
Existing Python UDF workflow execution remains unchanged in this PR
because `PythonUDFOpDescV2`, `PythonUDFSourceOpDescV2`, and
`DualInputPortsPythonUDFOpDescV2` are not modified here.
### Any related issues, documentation, discussions?
Part of the Python UDF UI parameter feature split from
`feat/ui-parameter`.
Related tracking issue / stack: #5044
Stack order:
1. Frontend UI parameter building blocks: #5043
2. Scala backend injection model: this PR
3. Python runtime support
4. End-to-end integration
### How was this PR tested?
Commands run:
```bash
sbt "WorkflowOperator / Test / testOnly
org.apache.texera.amber.operator.udf.python.PythonUdfUiParameterInjectorSpec"
sbt scalafmtAll
sbt scalafmtCheckAll "scalafixAll --check"
```
Results:
- `PythonUdfUiParameterInjectorSpec`: 10 tests passed.
- `scalafmtAll`: no file changes.
- `scalafmtCheckAll` and `scalafixAll --check`: passed.
### Was this PR authored or co-authored using generative AI tooling?
No
---------
Co-authored-by: Xiaozhen Liu <[email protected]>
---
.../texera/amber/pybuilder/PythonLexerUtils.scala | 62 +++++
.../amber/pybuilder/PythonLexerUtilsSpec.scala | 31 +++
.../apache/texera/amber/core/tuple/Attribute.java | 2 +
.../udf/python/PythonUdfUiParameterInjector.scala | 205 ++++++++++++++++
.../amber/operator/udf/python/UiUDFParameter.scala | 47 ++++
.../python/PythonUdfUiParameterInjectorSpec.scala | 262 +++++++++++++++++++++
6 files changed, 609 insertions(+)
diff --git
a/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
b/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
index 08aac3a9e8..75442f759d 100644
---
a/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
+++
b/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
@@ -37,6 +37,68 @@ object PythonLexerUtils {
if (lastNewlineIndex >= 0) s.substring(lastNewlineIndex + 1) else s
}
+ /**
+ * Update triple-quoted-string state after scanning one physical Python
source line.
+ *
+ * This is intentionally lightweight. It only tracks whether scanning is
inside a `'''` or `"""` string so callers
+ * that reason about indentation can avoid treating string contents as real
Python statements.
+ *
+ * Known limitations: escaped delimiters inside an active triple-quoted
string are still treated as closing
+ * delimiters, and delimiter-like runs next to ordinary string boundaries
may be detected because this helper does
+ * not fully parse Python string literal adjacency.
+ */
+ def updateTripleQuotedStringState(
+ line: String,
+ activeDelimiter: Option[String]
+ ): Option[String] = {
+ var delimiter = activeDelimiter
+ var inSingleQuotedString = false
+ var inDoubleQuotedString = false
+ var escaped = false
+ var index = 0
+
+ while (index < line.length) {
+ delimiter match {
+ case Some(active) =>
+ if (line.startsWith(active, index)) {
+ delimiter = None
+ index += active.length
+ } else {
+ index += 1
+ }
+
+ case None =>
+ val char = line.charAt(index)
+
+ if (escaped) {
+ escaped = false
+ index += 1
+ } else if ((inSingleQuotedString || inDoubleQuotedString) && char ==
'\\') {
+ escaped = true
+ index += 1
+ } else if (!inSingleQuotedString && !inDoubleQuotedString && char ==
'#') {
+ return delimiter
+ } else if (!inDoubleQuotedString && line.startsWith("'''", index)) {
+ delimiter = Some("'''")
+ index += 3
+ } else if (!inSingleQuotedString && line.startsWith("\"\"\"",
index)) {
+ delimiter = Some("\"\"\"")
+ index += 3
+ } else if (!inDoubleQuotedString && char == '\'') {
+ inSingleQuotedString = !inSingleQuotedString
+ index += 1
+ } else if (!inSingleQuotedString && char == '"') {
+ inDoubleQuotedString = !inDoubleQuotedString
+ index += 1
+ } else {
+ index += 1
+ }
+ }
+ }
+
+ delimiter
+ }
+
/**
* Detect whether the provided line tail contains an unclosed single or
double quote.
*
diff --git
a/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
b/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
index ea473969e7..0939d2c4a6 100644
---
a/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
+++
b/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
@@ -100,6 +100,37 @@ class PythonLexerUtilsSpec extends AnyFunSuite {
assert(PythonLexerUtils.lineTail(text) == "")
}
+ // -------- updateTripleQuotedStringState --------
+
+ test("updateTripleQuotedStringState: enters and exits triple single quoted
strings") {
+ val opened = PythonLexerUtils.updateTripleQuotedStringState("sql = '''",
None)
+ assert(opened.contains("'''"))
+
+ val stillOpen = PythonLexerUtils.updateTripleQuotedStringState("SELECT *
FROM t", opened)
+ assert(stillOpen.contains("'''"))
+
+ val closed = PythonLexerUtils.updateTripleQuotedStringState("'''",
stillOpen)
+ assert(closed.isEmpty)
+ }
+
+ test("updateTripleQuotedStringState: enters and exits triple double quoted
strings") {
+ val opened = PythonLexerUtils.updateTripleQuotedStringState("sql =
\"\"\"", None)
+ assert(opened.contains("\"\"\""))
+
+ val closed = PythonLexerUtils.updateTripleQuotedStringState("\"\"\"",
opened)
+ assert(closed.isEmpty)
+ }
+
+ test("updateTripleQuotedStringState: ignores triple quotes in single-line
comments") {
+ val state = PythonLexerUtils.updateTripleQuotedStringState("# ''' not a
string", None)
+ assert(state.isEmpty)
+ }
+
+ test("updateTripleQuotedStringState: ignores triple quotes inside ordinary
strings") {
+ val state = PythonLexerUtils.updateTripleQuotedStringState("value =
\"'''\"", None)
+ assert(state.isEmpty)
+ }
+
// -------- hasUnclosedQuote --------
test("hasUnclosedQuote: empty string has no unclosed quote") {
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
index 84d52fddce..fb434e0875 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
@@ -21,6 +21,7 @@ package org.apache.texera.amber.core.tuple;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.texera.amber.pybuilder.EncodableStringAnnotation;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@@ -49,6 +50,7 @@ public class Attribute implements Serializable {
@JsonProperty(value = "attributeName", required = true)
@NotBlank(message = "Attribute name is required")
+ @EncodableStringAnnotation
public String getName() {
return attributeName;
}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala
new file mode 100644
index 0000000000..13c25a436e
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import
org.apache.texera.amber.pybuilder.PythonLexerUtils.updateTripleQuotedStringState
+import org.apache.texera.amber.pybuilder.PythonTemplateBuilder
+import
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
+
+import scala.util.matching.Regex
+
+/**
+ * Injects the reserved UI-parameter hook into user-written Python UDF code.
+ *
+ * Operator descriptors should call this after loading saved
[[UiUDFParameter]] values and before sending Python source
+ * to runtime execution. The injected hook returns decoded parameter names
and values that Python runtime support reads
+ * before the user's `open()` method runs.
+ */
+object PythonUdfUiParameterInjector {
+
+ private val InjectedUiParametersHookMethodName =
"_texera_injected_ui_parameters"
+ private val InjectedUiParametersHookMethodHeader =
+ s"def $InjectedUiParametersHookMethodName(self) -> Dict[str, Any]:"
+ private val UnsupportedUiParameterTypes = Set(AttributeType.BINARY,
AttributeType.LARGE_BINARY)
+
+ // Keep supported user-facing UDF class names in sync with the frontend
parser.
+ private val SupportedPythonUdfClassHeaderRegex: Regex =
+ """(?m)^([
\t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r
+
+ private def validate(uiParameters: List[UiUDFParameter]): Unit = {
+ val attributes = uiParameters.map(parameterAttribute)
+ attributes.foreach(validateSupportedType)
+
+ attributes
+ .groupBy(_.getName)
+ .collectFirst {
+ case (parameterName, matchingAttributes) if matchingAttributes.size >
1 => parameterName
+ }
+ .foreach { duplicateName =>
+ throw new RuntimeException(s"UiParameter name '$duplicateName' is
declared more than once.")
+ }
+ }
+
+ private def parameterAttribute(parameter: UiUDFParameter): Attribute =
+ Option(parameter).flatMap(parameter =>
Option(parameter.attribute)).getOrElse {
+ throw new RuntimeException("UiParameter attribute is required.")
+ }
+
+ private def validateSupportedType(attribute: Attribute): Unit = {
+ if (UnsupportedUiParameterTypes.contains(attribute.getType)) {
+ throw new RuntimeException(
+ s"UiParameter type '${attribute.getType.name()}' is not supported. " +
+ "Use string, integer, long, double, boolean, or timestamp instead."
+ )
+ }
+ }
+
+ private def buildInjectedParameterEntry(parameter: UiUDFParameter):
PythonTemplateBuilder = {
+ pyb"${parameter.attribute.getName}: ${parameter.value}"
+ }
+
+ private def buildInjectedParametersMap(
+ uiParameters: List[UiUDFParameter]
+ ): PythonTemplateBuilder = {
+ val entries = uiParameters.map(buildInjectedParameterEntry)
+ entries.reduceOption((acc, entry) => acc + pyb", " +
entry).getOrElse(pyb"")
+ }
+
+ private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]):
String = {
+ val injectedParametersMap = buildInjectedParametersMap(uiParameters)
+
+ (pyb"""|# Follow-up runtime support exports Dict/Any and defines the base
hook that @overrides targets.
+ |@overrides
+ |$InjectedUiParametersHookMethodHeader
+ | return {""" +
+ injectedParametersMap +
+ pyb"""}
+ |""").encode
+ }
+
+ private def indentBlock(block: String, indent: String): String = {
+ block
+ .split("\n", -1)
+ .map { line =>
+ if (line.nonEmpty) indent + line else line
+ }
+ .mkString("\n")
+ }
+
+ private def lineEndIndex(text: String, from: Int): Int = {
+ val lineEnd = text.indexOf('\n', from)
+ if (lineEnd < 0) text.length else lineEnd
+ }
+
+ private def detectClassBlockEnd(code: String, classHeaderStart: Int,
classIndent: String): Int = {
+ val classLineEnd = lineEndIndex(code, classHeaderStart)
+ var lineStart = if (classLineEnd < code.length) classLineEnd + 1 else
code.length
+ var tripleQuotedStringDelimiter: Option[String] = None
+
+ while (lineStart < code.length) {
+ val lineEnd = lineEndIndex(code, lineStart)
+ val line = code.substring(lineStart, lineEnd)
+
+ val trimmed = line.trim
+ val isBlank = trimmed.isEmpty
+
+ val currentIndentLen = line.segmentLength(ch => ch == ' ' || ch == '\t')
+ val classIndentLen = classIndent.length
+
+ if (tripleQuotedStringDelimiter.isEmpty && !isBlank && currentIndentLen
<= classIndentLen) {
+ return lineStart
+ }
+
+ tripleQuotedStringDelimiter = updateTripleQuotedStringState(line,
tripleQuotedStringDelimiter)
+
+ lineStart = if (lineEnd < code.length) lineEnd + 1 else code.length
+ }
+
+ code.length
+ }
+
+ private def containsReservedHook(classBlock: String): Boolean = {
+ val hookRegex =
+ ("""(?m)^[ \t]+def\s+""" +
Regex.quote(InjectedUiParametersHookMethodName) + """\s*\(""").r
+ hookRegex.findFirstIn(classBlock).isDefined
+ }
+
+ private def injectHookIntoUserClass(userCode: String, hookMethod: String):
String = {
+ val classHeaderMatch =
+ SupportedPythonUdfClassHeaderRegex.findFirstMatchIn(userCode).getOrElse {
+ throw new RuntimeException(
+ "UiParameters were provided, but no supported Python UDF class was
found. " +
+ "Use one of ProcessTupleOperator, ProcessBatchOperator,
ProcessTableOperator, or GenerateOperator."
+ )
+ }
+
+ val classHeaderStart = classHeaderMatch.start
+ val classIndent = classHeaderMatch.group(1)
+ val classBlockEnd = detectClassBlockEnd(userCode, classHeaderStart,
classIndent)
+
+ val classBlock = userCode.substring(classHeaderStart, classBlockEnd)
+
+ if (containsReservedHook(classBlock)) {
+ throw new RuntimeException(
+ s"Reserved method '$InjectedUiParametersHookMethodName' is already
defined in the UDF class. Please rename your method."
+ )
+ }
+
+ val bodyIndent = inferClassBodyIndent(classBlock,
classIndent).getOrElse(classIndent + " ")
+ val indentedHook = indentBlock(
+ (if (classBlock.endsWith("\n")) "" else "\n") + hookMethod.trim + "\n",
+ bodyIndent
+ )
+
+ userCode.substring(0, classBlockEnd) +
+ indentedHook +
+ userCode.substring(classBlockEnd)
+ }
+
+ private def inferClassBodyIndent(classBlock: String, classIndent: String):
Option[String] = {
+ val lines = classBlock.split("\n", -1).toList.drop(1)
+
+ lines.collectFirst {
+ case line if line.trim.nonEmpty =>
+ val leading = line.takeWhile(ch => ch == ' ' || ch == '\t')
+ if (leading.length > classIndent.length) leading else classIndent + "
"
+ }
+ }
+
+ /**
+ * Returns Python code with the UI-parameter hook injected into the
supported UDF class.
+ *
+ * If `uiParameters` is empty, the code is returned unchanged. Throws
[[RuntimeException]] when parameter metadata is
+ * invalid, the user already defines the reserved hook method, or
parameters are provided for an unsupported class.
+ */
+ def inject(code: String, uiParameters: List[UiUDFParameter]): String = {
+ val parameters = Option(uiParameters).getOrElse(List.empty)
+ validate(parameters)
+
+ val userCode = Option(code).getOrElse("")
+
+ if (parameters.isEmpty) {
+ return userCode
+ }
+
+ val hookMethod = buildInjectedHookMethod(parameters)
+ injectHookIntoUserClass(userCode, hookMethod)
+ }
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala
new file mode 100644
index 0000000000..b18b9a181d
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.tuple.Attribute
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+
+import javax.validation.Valid
+import javax.validation.constraints.NotNull
+
+/**
+ * Serialized operator property for one Python UDF UI parameter.
+ *
+ * `attribute` carries the inferred parameter name and type. `value` is
user-entered text and is marked as
+ * [[EncodableString]] so Python code generation decodes it at runtime
instead of embedding raw text into generated code.
+ */
+class UiUDFParameter {
+
+ @JsonProperty(required = true)
+ @JsonSchemaTitle("Attribute")
+ @Valid
+ @NotNull(message = "Attribute is required")
+ var attribute: Attribute = _
+
+ @JsonProperty()
+ @JsonSchemaTitle("Value")
+ var value: EncodableString = ""
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala
new file mode 100644
index 0000000000..d5a2534758
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers {
+
+ private def uiParameter(
+ attributeName: String,
+ attributeType: AttributeType,
+ value: String
+ ): UiUDFParameter = {
+ val parameter = new UiUDFParameter
+ parameter.attribute = new Attribute(attributeName, attributeType)
+ parameter.value = value
+ parameter
+ }
+
+ private def inject(parameters: UiUDFParameter*): String =
+ PythonUdfUiParameterInjector.inject(baseUdfCode, parameters.toList)
+
+ private def inject(code: String, parameters: UiUDFParameter*): String =
+ PythonUdfUiParameterInjector.inject(code, parameters.toList)
+
+ private def decoderCallCount(code: String): Int =
+ code.sliding("self.decode_python_template".length).count(_ ==
"self.decode_python_template")
+
+ private val baseUdfCode: String =
+ """from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | @overrides
+ | def open(self):
+ | print("open")
+ |
+ | @overrides
+ | def process_tuple(self, tuple_: Tuple, port: int):
+ | yield tuple_
+ |""".stripMargin
+
+ it should "return user code unchanged when there are no UI parameters" in {
+ val injectedCode = inject()
+
+ injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):")
+ injectedCode should include("""print("open")""")
+ injectedCode should not include ("_texera_injected_ui_parameters")
+ injectedCode should not include ("self.decode_python_template")
+ injectedCode should not include ("import typing")
+ }
+
+ it should "return unsupported user code unchanged when there are no UI
parameters" in {
+ val nonSupportedCode =
+ """from pytexera import *
+ |
+ |class SomethingElse:
+ | def open(self):
+ | pass
+ |""".stripMargin
+
+ inject(nonSupportedCode) shouldBe nonSupportedCode
+ }
+
+ it should "preserve user source lines that look like Scala stripMargin
input" in {
+ val udfCodeWithPipeLine =
+ """from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | def open(self):
+ | pattern = "keep"
+ | text = '''
+ | |do not strip this line
+ |'''
+ |
+ | def process_tuple(self, tuple_: Tuple, port: int):
+ | yield tuple_
+ |""".stripMargin
+
+ val injectedCode = inject(udfCodeWithPipeLine, uiParameter("k",
AttributeType.STRING, "v"))
+
+ injectedCode should include(" |do not strip this line")
+ injectedCode should include("def _texera_injected_ui_parameters(self) ->
Dict[str, Any]:")
+ }
+
+ it should "inject UI parameter hook into supported UDF class using Dict and
Any from pytexera" in {
+ val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP,
"2024-01-01T00:00:00Z"))
+
+ injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):")
+ injectedCode should include(
+ "# Follow-up runtime support exports Dict/Any and defines the base hook
that @overrides targets."
+ )
+ injectedCode should include("def _texera_injected_ui_parameters(self) ->
Dict[str, Any]:")
+ injectedCode should include("return {")
+ injectedCode should include("self.decode_python_template")
+ decoderCallCount(injectedCode) shouldBe 2
+ injectedCode should include("""print("open")""")
+ injectedCode should not include ("import typing")
+ injectedCode should not include ("typing.Dict")
+ injectedCode should not include ("typing.Any")
+ }
+
+ it should "append the reserved hook inside the class before the next
top-level statement" in {
+ val udfCodeWithSiblingDefinition =
+ """from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | @overrides
+ | def open(self):
+ | print("open")
+ |
+ | @overrides
+ | def process_tuple(self, tuple_: Tuple, port: int):
+ | yield tuple_
+ |
+ |def helper():
+ | return "outside"
+ |""".stripMargin
+
+ val injectedCode =
+ inject(udfCodeWithSiblingDefinition, uiParameter("k",
AttributeType.STRING, "v"))
+
+ val hookIndex = injectedCode.indexOf("def
_texera_injected_ui_parameters(self)")
+ val processTupleIndex =
+ injectedCode.indexOf("def process_tuple(self, tuple_: Tuple, port:
int):")
+ val helperIndex = injectedCode.indexOf("def helper():")
+
+ hookIndex should be >= 0
+ processTupleIndex should be < hookIndex
+ helperIndex should be > hookIndex
+ }
+
+ it should "append the reserved hook after triple-quoted strings that contain
top-level-looking lines" in {
+ val udfCodeWithTripleQuotedString =
+ """from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | def process_tuple(self, tuple_: Tuple, port: int):
+ | sql = '''
+ |SELECT * FROM t
+ |'''
+ | yield tuple_
+ |
+ |def helper():
+ | return "outside"
+ |""".stripMargin
+
+ val injectedCode =
+ inject(udfCodeWithTripleQuotedString, uiParameter("k",
AttributeType.STRING, "v"))
+
+ val hookIndex = injectedCode.indexOf("def
_texera_injected_ui_parameters(self)")
+ val stringEndIndex = injectedCode.indexOf("'''\n yield tuple_")
+ val helperIndex = injectedCode.indexOf("def helper():")
+
+ stringEndIndex should be >= 0
+ stringEndIndex should be < hookIndex
+ hookIndex should be < helperIndex
+ }
+
+ it should "preserve multiple UI parameters in the injected map" in {
+ val injectedCode = inject(
+ uiParameter("param1", AttributeType.DOUBLE, "12.5"),
+ uiParameter("param2", AttributeType.INTEGER, "1"),
+ uiParameter("param3", AttributeType.STRING, "Hola"),
+ uiParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z")
+ )
+
+ injectedCode should include("def _texera_injected_ui_parameters(self) ->
Dict[str, Any]:")
+ injectedCode should include("self.decode_python_template")
+ decoderCallCount(injectedCode) shouldBe 8
+ injectedCode should not include ("import typing")
+ }
+
+ it should "throw when a parameter attribute is missing" in {
+ val invalidParameter = new UiUDFParameter
+ invalidParameter.attribute = null
+ invalidParameter.value = "anything"
+
+ val exception = the[RuntimeException] thrownBy {
+ inject(invalidParameter)
+ }
+
+ exception.getMessage should include("UiParameter attribute is required")
+ }
+
+ it should "throw when a UI parameter name is duplicated" in {
+ val exception = the[RuntimeException] thrownBy {
+ inject(
+ uiParameter("date", AttributeType.STRING, "2024-01-01"),
+ uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")
+ )
+ }
+
+ exception.getMessage should include("UiParameter name 'date' is declared
more than once")
+ }
+
+ Seq(AttributeType.BINARY, AttributeType.LARGE_BINARY).foreach {
unsupportedType =>
+ it should s"throw when a UI parameter uses ${unsupportedType.name()} type"
in {
+ val exception = the[RuntimeException] thrownBy {
+ inject(uiParameter("payload", unsupportedType, "68656c6c6f"))
+ }
+
+ exception.getMessage should include(
+ s"UiParameter type '${unsupportedType.name()}' is not supported"
+ )
+ }
+ }
+
+ it should "throw when the reserved hook is already defined by the user" in {
+ val udfWithReservedHook =
+ """from pytexera import *
+ |
+ |class ProcessTupleOperator(UDFOperatorV2):
+ | def _texera_injected_ui_parameters(self):
+ | return {}
+ |
+ | def open(self):
+ | pass
+ |""".stripMargin
+
+ val exception = the[RuntimeException] thrownBy {
+ inject(udfWithReservedHook, uiParameter("k", AttributeType.STRING, "v"))
+ }
+
+ exception.getMessage should include(
+ "Reserved method '_texera_injected_ui_parameters' is already defined"
+ )
+ }
+
+ it should "throw when UI parameters are provided but no supported user class
is present" in {
+ val nonSupportedCode =
+ """from pytexera import *
+ |
+ |class SomethingElse:
+ | def open(self):
+ | pass
+ |""".stripMargin
+
+ val exception = the[RuntimeException] thrownBy {
+ inject(nonSupportedCode, uiParameter("k", AttributeType.STRING, "v"))
+ }
+
+ exception.getMessage should include("no supported Python UDF class was
found")
+ }
+}