This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5141-7a6c22a09b0efd60cee35e106dbbc66de2a73a55
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 7deed35fddf58bc910991af920f0cf4d16c46f9b
Author: carloea2 <[email protected]>
AuthorDate: Thu Jun 4 11:49:59 2026 -0700

    feat(workflow-operator): add Python UDF UI parameter injection model (#5141)
    
    ### What changes were proposed in this PR?
    
    This PR adds the Scala backend foundation for Python UDF UI parameters.
    
    It introduces:
    
    | Area | Change |
    | --- | --- |
    | UI parameter model | Adds `UiUDFParameter`, containing
    backend-compatible `attribute` metadata and an editable `value`. |
    | Python UDF injector | Adds `PythonUdfUiParameterInjector`, which
    validates UI parameters and injects a reserved hook method into
    supported Python UDF classes. |
    | Safe string encoding | Marks `Attribute.getName()` as encodable so UI
    parameter names are safely rendered through the Python template builder.
    |
    | Test coverage | Adds Scala tests for hook injection, validation,
    unsupported types, reserved method conflicts, and unchanged behavior
    when no UI parameters exist. |
    
    This PR is stacked after the merged frontend foundation PR #5043. It
    does not yet wire the injector into operator execution; that wiring is
    handled by later PRs in the stack.
    
    Existing Python UDF workflow execution remains unchanged in this PR
    because `PythonUDFOpDescV2`, `PythonUDFSourceOpDescV2`, and
    `DualInputPortsPythonUDFOpDescV2` are not modified here.
    
    ### Any related issues, documentation, discussions?
    
    Part of the Python UDF UI parameter feature split from
    `feat/ui-parameter`.
    
    Related tracking issue / stack: #5044
    
    Stack order:
    
    1. Frontend UI parameter building blocks: #5043
    2. Scala backend injection model: this PR
    3. Python runtime support
    4. End-to-end integration
    
    ### How was this PR tested?
    
    Commands run:
    
    ```bash
    sbt "WorkflowOperator / Test / testOnly 
org.apache.texera.amber.operator.udf.python.PythonUdfUiParameterInjectorSpec"
    sbt scalafmtAll
    sbt scalafmtCheckAll "scalafixAll --check"
    ```
    
    Results:
    
    - `PythonUdfUiParameterInjectorSpec`: 10 tests passed.
    - `scalafmtAll`: no file changes.
    - `scalafmtCheckAll` and `scalafixAll --check`: passed.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    No
    
    ---------
    
    Co-authored-by: Xiaozhen Liu <[email protected]>
---
 .../texera/amber/pybuilder/PythonLexerUtils.scala  |  62 +++++
 .../amber/pybuilder/PythonLexerUtilsSpec.scala     |  31 +++
 .../apache/texera/amber/core/tuple/Attribute.java  |   2 +
 .../udf/python/PythonUdfUiParameterInjector.scala  | 205 ++++++++++++++++
 .../amber/operator/udf/python/UiUDFParameter.scala |  47 ++++
 .../python/PythonUdfUiParameterInjectorSpec.scala  | 262 +++++++++++++++++++++
 6 files changed, 609 insertions(+)

diff --git 
a/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
 
b/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
index 08aac3a9e8..75442f759d 100644
--- 
a/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
+++ 
b/common/pybuilder/src/main/scala/org/apache/texera/amber/pybuilder/PythonLexerUtils.scala
@@ -37,6 +37,68 @@ object PythonLexerUtils {
     if (lastNewlineIndex >= 0) s.substring(lastNewlineIndex + 1) else s
   }
 
+  /**
+    * Update triple-quoted-string state after scanning one physical Python 
source line.
+    *
+    * This is intentionally lightweight. It only tracks whether scanning is 
inside a `'''` or `"""` string so callers
+    * that reason about indentation can avoid treating string contents as real 
Python statements.
+    *
+    * Known limitations: escaped delimiters inside an active triple-quoted 
string are still treated as closing
+    * delimiters, and delimiter-like runs next to ordinary string boundaries 
may be detected because this helper does
+    * not fully parse Python string literal adjacency.
+    */
+  def updateTripleQuotedStringState(
+      line: String,
+      activeDelimiter: Option[String]
+  ): Option[String] = {
+    var delimiter = activeDelimiter
+    var inSingleQuotedString = false
+    var inDoubleQuotedString = false
+    var escaped = false
+    var index = 0
+
+    while (index < line.length) {
+      delimiter match {
+        case Some(active) =>
+          if (line.startsWith(active, index)) {
+            delimiter = None
+            index += active.length
+          } else {
+            index += 1
+          }
+
+        case None =>
+          val char = line.charAt(index)
+
+          if (escaped) {
+            escaped = false
+            index += 1
+          } else if ((inSingleQuotedString || inDoubleQuotedString) && char == 
'\\') {
+            escaped = true
+            index += 1
+          } else if (!inSingleQuotedString && !inDoubleQuotedString && char == 
'#') {
+            return delimiter
+          } else if (!inDoubleQuotedString && line.startsWith("'''", index)) {
+            delimiter = Some("'''")
+            index += 3
+          } else if (!inSingleQuotedString && line.startsWith("\"\"\"", 
index)) {
+            delimiter = Some("\"\"\"")
+            index += 3
+          } else if (!inDoubleQuotedString && char == '\'') {
+            inSingleQuotedString = !inSingleQuotedString
+            index += 1
+          } else if (!inSingleQuotedString && char == '"') {
+            inDoubleQuotedString = !inDoubleQuotedString
+            index += 1
+          } else {
+            index += 1
+          }
+      }
+    }
+
+    delimiter
+  }
+
   /**
     * Detect whether the provided line tail contains an unclosed single or 
double quote.
     *
diff --git 
a/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
 
b/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
index ea473969e7..0939d2c4a6 100644
--- 
a/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
+++ 
b/common/pybuilder/src/test/scala/org/apache/texera/amber/pybuilder/PythonLexerUtilsSpec.scala
@@ -100,6 +100,37 @@ class PythonLexerUtilsSpec extends AnyFunSuite {
     assert(PythonLexerUtils.lineTail(text) == "")
   }
 
+  // -------- updateTripleQuotedStringState --------
+
+  test("updateTripleQuotedStringState: enters and exits triple single quoted 
strings") {
+    val opened = PythonLexerUtils.updateTripleQuotedStringState("sql = '''", 
None)
+    assert(opened.contains("'''"))
+
+    val stillOpen = PythonLexerUtils.updateTripleQuotedStringState("SELECT * 
FROM t", opened)
+    assert(stillOpen.contains("'''"))
+
+    val closed = PythonLexerUtils.updateTripleQuotedStringState("'''", 
stillOpen)
+    assert(closed.isEmpty)
+  }
+
+  test("updateTripleQuotedStringState: enters and exits triple double quoted 
strings") {
+    val opened = PythonLexerUtils.updateTripleQuotedStringState("sql = 
\"\"\"", None)
+    assert(opened.contains("\"\"\""))
+
+    val closed = PythonLexerUtils.updateTripleQuotedStringState("\"\"\"", 
opened)
+    assert(closed.isEmpty)
+  }
+
+  test("updateTripleQuotedStringState: ignores triple quotes in single-line 
comments") {
+    val state = PythonLexerUtils.updateTripleQuotedStringState("# ''' not a 
string", None)
+    assert(state.isEmpty)
+  }
+
+  test("updateTripleQuotedStringState: ignores triple quotes inside ordinary 
strings") {
+    val state = PythonLexerUtils.updateTripleQuotedStringState("value = 
\"'''\"", None)
+    assert(state.isEmpty)
+  }
+
   // -------- hasUnclosedQuote --------
 
   test("hasUnclosedQuote: empty string has no unclosed quote") {
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
index 84d52fddce..fb434e0875 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java
@@ -21,6 +21,7 @@ package org.apache.texera.amber.core.tuple;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.texera.amber.pybuilder.EncodableStringAnnotation;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -49,6 +50,7 @@ public class Attribute implements Serializable {
 
     @JsonProperty(value = "attributeName", required = true)
     @NotBlank(message = "Attribute name is required")
+    @EncodableStringAnnotation
     public String getName() {
         return attributeName;
     }
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala
new file mode 100644
index 0000000000..13c25a436e
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import 
org.apache.texera.amber.pybuilder.PythonLexerUtils.updateTripleQuotedStringState
+import org.apache.texera.amber.pybuilder.PythonTemplateBuilder
+import 
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
+
+import scala.util.matching.Regex
+
+/**
+  * Injects the reserved UI-parameter hook into user-written Python UDF code.
+  *
+  * Operator descriptors should call this after loading saved 
[[UiUDFParameter]] values and before sending Python source
+  * to runtime execution. The injected hook returns decoded parameter names 
and values that Python runtime support reads
+  * before the user's `open()` method runs.
+  */
+object PythonUdfUiParameterInjector {
+
+  private val InjectedUiParametersHookMethodName = 
"_texera_injected_ui_parameters"
+  private val InjectedUiParametersHookMethodHeader =
+    s"def $InjectedUiParametersHookMethodName(self) -> Dict[str, Any]:"
+  private val UnsupportedUiParameterTypes = Set(AttributeType.BINARY, 
AttributeType.LARGE_BINARY)
+
+  // Keep supported user-facing UDF class names in sync with the frontend 
parser.
+  private val SupportedPythonUdfClassHeaderRegex: Regex =
+    """(?m)^([ 
\t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r
+
+  private def validate(uiParameters: List[UiUDFParameter]): Unit = {
+    val attributes = uiParameters.map(parameterAttribute)
+    attributes.foreach(validateSupportedType)
+
+    attributes
+      .groupBy(_.getName)
+      .collectFirst {
+        case (parameterName, matchingAttributes) if matchingAttributes.size > 
1 => parameterName
+      }
+      .foreach { duplicateName =>
+        throw new RuntimeException(s"UiParameter name '$duplicateName' is 
declared more than once.")
+      }
+  }
+
+  private def parameterAttribute(parameter: UiUDFParameter): Attribute =
+    Option(parameter).flatMap(parameter => 
Option(parameter.attribute)).getOrElse {
+      throw new RuntimeException("UiParameter attribute is required.")
+    }
+
+  private def validateSupportedType(attribute: Attribute): Unit = {
+    if (UnsupportedUiParameterTypes.contains(attribute.getType)) {
+      throw new RuntimeException(
+        s"UiParameter type '${attribute.getType.name()}' is not supported. " +
+          "Use string, integer, long, double, boolean, or timestamp instead."
+      )
+    }
+  }
+
+  private def buildInjectedParameterEntry(parameter: UiUDFParameter): 
PythonTemplateBuilder = {
+    pyb"${parameter.attribute.getName}: ${parameter.value}"
+  }
+
+  private def buildInjectedParametersMap(
+      uiParameters: List[UiUDFParameter]
+  ): PythonTemplateBuilder = {
+    val entries = uiParameters.map(buildInjectedParameterEntry)
+    entries.reduceOption((acc, entry) => acc + pyb", " + 
entry).getOrElse(pyb"")
+  }
+
+  private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): 
String = {
+    val injectedParametersMap = buildInjectedParametersMap(uiParameters)
+
+    (pyb"""|# Follow-up runtime support exports Dict/Any and defines the base 
hook that @overrides targets.
+           |@overrides
+           |$InjectedUiParametersHookMethodHeader
+           |    return {""" +
+      injectedParametersMap +
+      pyb"""}
+           |""").encode
+  }
+
+  private def indentBlock(block: String, indent: String): String = {
+    block
+      .split("\n", -1)
+      .map { line =>
+        if (line.nonEmpty) indent + line else line
+      }
+      .mkString("\n")
+  }
+
+  private def lineEndIndex(text: String, from: Int): Int = {
+    val lineEnd = text.indexOf('\n', from)
+    if (lineEnd < 0) text.length else lineEnd
+  }
+
+  private def detectClassBlockEnd(code: String, classHeaderStart: Int, 
classIndent: String): Int = {
+    val classLineEnd = lineEndIndex(code, classHeaderStart)
+    var lineStart = if (classLineEnd < code.length) classLineEnd + 1 else 
code.length
+    var tripleQuotedStringDelimiter: Option[String] = None
+
+    while (lineStart < code.length) {
+      val lineEnd = lineEndIndex(code, lineStart)
+      val line = code.substring(lineStart, lineEnd)
+
+      val trimmed = line.trim
+      val isBlank = trimmed.isEmpty
+
+      val currentIndentLen = line.segmentLength(ch => ch == ' ' || ch == '\t')
+      val classIndentLen = classIndent.length
+
+      if (tripleQuotedStringDelimiter.isEmpty && !isBlank && currentIndentLen 
<= classIndentLen) {
+        return lineStart
+      }
+
+      tripleQuotedStringDelimiter = updateTripleQuotedStringState(line, 
tripleQuotedStringDelimiter)
+
+      lineStart = if (lineEnd < code.length) lineEnd + 1 else code.length
+    }
+
+    code.length
+  }
+
+  private def containsReservedHook(classBlock: String): Boolean = {
+    val hookRegex =
+      ("""(?m)^[ \t]+def\s+""" + 
Regex.quote(InjectedUiParametersHookMethodName) + """\s*\(""").r
+    hookRegex.findFirstIn(classBlock).isDefined
+  }
+
+  private def injectHookIntoUserClass(userCode: String, hookMethod: String): 
String = {
+    val classHeaderMatch =
+      SupportedPythonUdfClassHeaderRegex.findFirstMatchIn(userCode).getOrElse {
+        throw new RuntimeException(
+          "UiParameters were provided, but no supported Python UDF class was 
found. " +
+            "Use one of ProcessTupleOperator, ProcessBatchOperator, 
ProcessTableOperator, or GenerateOperator."
+        )
+      }
+
+    val classHeaderStart = classHeaderMatch.start
+    val classIndent = classHeaderMatch.group(1)
+    val classBlockEnd = detectClassBlockEnd(userCode, classHeaderStart, 
classIndent)
+
+    val classBlock = userCode.substring(classHeaderStart, classBlockEnd)
+
+    if (containsReservedHook(classBlock)) {
+      throw new RuntimeException(
+        s"Reserved method '$InjectedUiParametersHookMethodName' is already 
defined in the UDF class. Please rename your method."
+      )
+    }
+
+    val bodyIndent = inferClassBodyIndent(classBlock, 
classIndent).getOrElse(classIndent + "    ")
+    val indentedHook = indentBlock(
+      (if (classBlock.endsWith("\n")) "" else "\n") + hookMethod.trim + "\n",
+      bodyIndent
+    )
+
+    userCode.substring(0, classBlockEnd) +
+      indentedHook +
+      userCode.substring(classBlockEnd)
+  }
+
+  private def inferClassBodyIndent(classBlock: String, classIndent: String): 
Option[String] = {
+    val lines = classBlock.split("\n", -1).toList.drop(1)
+
+    lines.collectFirst {
+      case line if line.trim.nonEmpty =>
+        val leading = line.takeWhile(ch => ch == ' ' || ch == '\t')
+        if (leading.length > classIndent.length) leading else classIndent + "  
  "
+    }
+  }
+
+  /**
+    * Returns Python code with the UI-parameter hook injected into the 
supported UDF class.
+    *
+    * If `uiParameters` is empty, the code is returned unchanged. Throws 
[[RuntimeException]] when parameter metadata is
+    * invalid, the user already defines the reserved hook method, or 
parameters are provided for an unsupported class.
+    */
+  def inject(code: String, uiParameters: List[UiUDFParameter]): String = {
+    val parameters = Option(uiParameters).getOrElse(List.empty)
+    validate(parameters)
+
+    val userCode = Option(code).getOrElse("")
+
+    if (parameters.isEmpty) {
+      return userCode
+    }
+
+    val hookMethod = buildInjectedHookMethod(parameters)
+    injectHookIntoUserClass(userCode, hookMethod)
+  }
+}
diff --git 
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala
 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala
new file mode 100644
index 0000000000..b18b9a181d
--- /dev/null
+++ 
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.udf.python
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.tuple.Attribute
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+
+import javax.validation.Valid
+import javax.validation.constraints.NotNull
+
+/**
+  * Serialized operator property for one Python UDF UI parameter.
+  *
+  * `attribute` carries the inferred parameter name and type. `value` is 
user-entered text and is marked as
+  * [[EncodableString]] so Python code generation decodes it at runtime 
instead of embedding raw text into generated code.
+  */
+class UiUDFParameter {
+
+  @JsonProperty(required = true)
+  @JsonSchemaTitle("Attribute")
+  @Valid
+  @NotNull(message = "Attribute is required")
+  var attribute: Attribute = _
+
+  @JsonProperty()
+  @JsonSchemaTitle("Value")
+  var value: EncodableString = ""
+}
diff --git 
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala
 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala
new file mode 100644
index 0000000000..d5a2534758
--- /dev/null
+++ 
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.texera.amber.operator.udf.python
+
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType}
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers {
+
+  private def uiParameter(
+      attributeName: String,
+      attributeType: AttributeType,
+      value: String
+  ): UiUDFParameter = {
+    val parameter = new UiUDFParameter
+    parameter.attribute = new Attribute(attributeName, attributeType)
+    parameter.value = value
+    parameter
+  }
+
+  private def inject(parameters: UiUDFParameter*): String =
+    PythonUdfUiParameterInjector.inject(baseUdfCode, parameters.toList)
+
+  private def inject(code: String, parameters: UiUDFParameter*): String =
+    PythonUdfUiParameterInjector.inject(code, parameters.toList)
+
+  private def decoderCallCount(code: String): Int =
+    code.sliding("self.decode_python_template".length).count(_ == 
"self.decode_python_template")
+
+  private val baseUdfCode: String =
+    """from pytexera import *
+      |
+      |class ProcessTupleOperator(UDFOperatorV2):
+      |    @overrides
+      |    def open(self):
+      |        print("open")
+      |
+      |    @overrides
+      |    def process_tuple(self, tuple_: Tuple, port: int):
+      |        yield tuple_
+      |""".stripMargin
+
+  it should "return user code unchanged when there are no UI parameters" in {
+    val injectedCode = inject()
+
+    injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):")
+    injectedCode should include("""print("open")""")
+    injectedCode should not include ("_texera_injected_ui_parameters")
+    injectedCode should not include ("self.decode_python_template")
+    injectedCode should not include ("import typing")
+  }
+
+  it should "return unsupported user code unchanged when there are no UI 
parameters" in {
+    val nonSupportedCode =
+      """from pytexera import *
+        |
+        |class SomethingElse:
+        |    def open(self):
+        |        pass
+        |""".stripMargin
+
+    inject(nonSupportedCode) shouldBe nonSupportedCode
+  }
+
+  it should "preserve user source lines that look like Scala stripMargin 
input" in {
+    val udfCodeWithPipeLine =
+      """from pytexera import *
+        |
+        |class ProcessTupleOperator(UDFOperatorV2):
+        |    def open(self):
+        |        pattern = "keep"
+        |        text = '''
+        |    |do not strip this line
+        |'''
+        |
+        |    def process_tuple(self, tuple_: Tuple, port: int):
+        |        yield tuple_
+        |""".stripMargin
+
+    val injectedCode = inject(udfCodeWithPipeLine, uiParameter("k", 
AttributeType.STRING, "v"))
+
+    injectedCode should include("    |do not strip this line")
+    injectedCode should include("def _texera_injected_ui_parameters(self) -> 
Dict[str, Any]:")
+  }
+
+  it should "inject UI parameter hook into supported UDF class using Dict and 
Any from pytexera" in {
+    val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP, 
"2024-01-01T00:00:00Z"))
+
+    injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):")
+    injectedCode should include(
+      "# Follow-up runtime support exports Dict/Any and defines the base hook 
that @overrides targets."
+    )
+    injectedCode should include("def _texera_injected_ui_parameters(self) -> 
Dict[str, Any]:")
+    injectedCode should include("return {")
+    injectedCode should include("self.decode_python_template")
+    decoderCallCount(injectedCode) shouldBe 2
+    injectedCode should include("""print("open")""")
+    injectedCode should not include ("import typing")
+    injectedCode should not include ("typing.Dict")
+    injectedCode should not include ("typing.Any")
+  }
+
+  it should "append the reserved hook inside the class before the next 
top-level statement" in {
+    val udfCodeWithSiblingDefinition =
+      """from pytexera import *
+        |
+        |class ProcessTupleOperator(UDFOperatorV2):
+        |    @overrides
+        |    def open(self):
+        |        print("open")
+        |
+        |    @overrides
+        |    def process_tuple(self, tuple_: Tuple, port: int):
+        |        yield tuple_
+        |
+        |def helper():
+        |    return "outside"
+        |""".stripMargin
+
+    val injectedCode =
+      inject(udfCodeWithSiblingDefinition, uiParameter("k", 
AttributeType.STRING, "v"))
+
+    val hookIndex = injectedCode.indexOf("def 
_texera_injected_ui_parameters(self)")
+    val processTupleIndex =
+      injectedCode.indexOf("def process_tuple(self, tuple_: Tuple, port: 
int):")
+    val helperIndex = injectedCode.indexOf("def helper():")
+
+    hookIndex should be >= 0
+    processTupleIndex should be < hookIndex
+    helperIndex should be > hookIndex
+  }
+
+  it should "append the reserved hook after triple-quoted strings that contain 
top-level-looking lines" in {
+    val udfCodeWithTripleQuotedString =
+      """from pytexera import *
+        |
+        |class ProcessTupleOperator(UDFOperatorV2):
+        |    def process_tuple(self, tuple_: Tuple, port: int):
+        |        sql = '''
+        |SELECT * FROM t
+        |'''
+        |        yield tuple_
+        |
+        |def helper():
+        |    return "outside"
+        |""".stripMargin
+
+    val injectedCode =
+      inject(udfCodeWithTripleQuotedString, uiParameter("k", 
AttributeType.STRING, "v"))
+
+    val hookIndex = injectedCode.indexOf("def 
_texera_injected_ui_parameters(self)")
+    val stringEndIndex = injectedCode.indexOf("'''\n        yield tuple_")
+    val helperIndex = injectedCode.indexOf("def helper():")
+
+    stringEndIndex should be >= 0
+    stringEndIndex should be < hookIndex
+    hookIndex should be < helperIndex
+  }
+
+  it should "preserve multiple UI parameters in the injected map" in {
+    val injectedCode = inject(
+      uiParameter("param1", AttributeType.DOUBLE, "12.5"),
+      uiParameter("param2", AttributeType.INTEGER, "1"),
+      uiParameter("param3", AttributeType.STRING, "Hola"),
+      uiParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z")
+    )
+
+    injectedCode should include("def _texera_injected_ui_parameters(self) -> 
Dict[str, Any]:")
+    injectedCode should include("self.decode_python_template")
+    decoderCallCount(injectedCode) shouldBe 8
+    injectedCode should not include ("import typing")
+  }
+
+  it should "throw when a parameter attribute is missing" in {
+    val invalidParameter = new UiUDFParameter
+    invalidParameter.attribute = null
+    invalidParameter.value = "anything"
+
+    val exception = the[RuntimeException] thrownBy {
+      inject(invalidParameter)
+    }
+
+    exception.getMessage should include("UiParameter attribute is required")
+  }
+
+  it should "throw when a UI parameter name is duplicated" in {
+    val exception = the[RuntimeException] thrownBy {
+      inject(
+        uiParameter("date", AttributeType.STRING, "2024-01-01"),
+        uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")
+      )
+    }
+
+    exception.getMessage should include("UiParameter name 'date' is declared 
more than once")
+  }
+
+  Seq(AttributeType.BINARY, AttributeType.LARGE_BINARY).foreach { 
unsupportedType =>
+    it should s"throw when a UI parameter uses ${unsupportedType.name()} type" 
in {
+      val exception = the[RuntimeException] thrownBy {
+        inject(uiParameter("payload", unsupportedType, "68656c6c6f"))
+      }
+
+      exception.getMessage should include(
+        s"UiParameter type '${unsupportedType.name()}' is not supported"
+      )
+    }
+  }
+
+  it should "throw when the reserved hook is already defined by the user" in {
+    val udfWithReservedHook =
+      """from pytexera import *
+        |
+        |class ProcessTupleOperator(UDFOperatorV2):
+        |    def _texera_injected_ui_parameters(self):
+        |        return {}
+        |
+        |    def open(self):
+        |        pass
+        |""".stripMargin
+
+    val exception = the[RuntimeException] thrownBy {
+      inject(udfWithReservedHook, uiParameter("k", AttributeType.STRING, "v"))
+    }
+
+    exception.getMessage should include(
+      "Reserved method '_texera_injected_ui_parameters' is already defined"
+    )
+  }
+
+  it should "throw when UI parameters are provided but no supported user class 
is present" in {
+    val nonSupportedCode =
+      """from pytexera import *
+        |
+        |class SomethingElse:
+        |    def open(self):
+        |        pass
+        |""".stripMargin
+
+    val exception = the[RuntimeException] thrownBy {
+      inject(nonSupportedCode, uiParameter("k", AttributeType.STRING, "v"))
+    }
+
+    exception.getMessage should include("no supported Python UDF class was 
found")
+  }
+}

Reply via email to