This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 2b9add956c feat(huggingFace): refactor operator into per-task codegen
+ text-generation (#5278)
2b9add956c is described below
commit 2b9add956c9e63c3c4f6e717221a0c5e33e54875
Author: Prateek Ganigi <[email protected]>
AuthorDate: Mon Jun 15 15:27:49 2026 -0700
feat(huggingFace): refactor operator into per-task codegen +
text-generation (#5278)
> ⚠️ This PR is stacked on #5124. Until that lands, the diff below
includes #5124's `HuggingFaceModelResource.scala` and the 1-line
registration in `TexeraWebApplication.scala`. The new code in this PR is
everything under
`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/`
and the new test under
`common/workflow-operator/src/test/.../huggingFace/HuggingFaceInferenceOpDescSpec.scala`.
Once #5124 merges, this diff will auto-clean to ~839 lines.
### What changes were proposed in this PR?
Refactors the monolithic 1,278-line `HuggingFaceInferenceOpDesc` from
the team's feature branch into a dispatcher + per-task codegen
architecture and ships the first task family (text-generation):
- `codegen/TaskCodegen.scala` introduces the trait + `CodegenContext`
that model per-task variation.
- `codegen/PythonCodegenBase.scala` emits the shared provider-fallback /
`process_table` / `_parse_response` infrastructure with two holes for
the per-task payload and parse snippets.
- `codegen/TextGenCodegen.scala` supplies text-generation's
chat-completions payload and the `body["choices"][0
["message"]["content"]` parse branch.
- `HuggingFaceInferenceOpDesc.scala` becomes a thin (~180-line)
dispatcher holding the `@JsonProperty` fields and the
`registeredCodegens` map.
User-input string fields are typed `EncodableString` and emitted via the
`pyb"..."` macro so values reach Python as
`self.decode_python_template('<base64>')` rather than raw literals.
Class constants are assigned in `open(self)` so `self` is in scope for
the decode call. The generated `process_table` runs a defensive
`_HF_MODEL_ID_PATTERN` check at runtime before any HF URL is composed.
The `TaskCodegen` trait also exposes a `tasks: Set[String]` default so a
single codegen can register under multiple task strings, this becomes
relevant in PR 3 (image family).
### Any related issues, documentation, or discussions?
Tracked in #5277 & #5041(umbrella issue for the HuggingFace operator
end-to-end implementation).
Closes #5277
Stacked on #5124 (PR 1 - REST resource).
This is PR 2 of a multi-PR series landing the HuggingFace operator
end-to-end. The full plan and umbrella issue live separately; this PR's
scope is exactly the dispatcher pattern + text-generation codegen.
### How was this PR tested?
- `sbt "WorkflowOperator/compile; WorkflowOperator/Test/compile"` clean.
- `sbt scalafmtCheck` clean.
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.huggingFace.HuggingFaceInferenceOpDescSpec"`
- 10/10 pass (operator info, validation, codegen wiring, MODEL_ID
runtime check, leak-prevention, clamping, schema).
- `sbt "WorkflowOperator/testOnly
org.apache.texera.amber.util.PythonCodeRawInvalidTextSpec"` - 117/117
descriptors `py_compile` cleanly, no raw-text leaks. The new operator is
included in this scan.
- Generated Python verified via `python3 -m py_compile` on a sample
output.
### Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.7
---------
Co-authored-by: Elliot Lin <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
Co-authored-by: Xuan Gu <[email protected]>
---
.../apache/texera/amber/operator/LogicalOp.scala | 2 +
.../huggingFace/HuggingFaceInferenceOpDesc.scala | 194 +++++++++++
.../huggingFace/codegen/PythonCodegenBase.scala | 376 +++++++++++++++++++++
.../operator/huggingFace/codegen/TaskCodegen.scala | 77 +++++
.../huggingFace/codegen/TextGenCodegen.scala | 54 +++
.../HuggingFaceInferenceOpDescSpec.scala | 202 +++++++++++
6 files changed, 905 insertions(+)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
index 4e9d6c6e2c..55e241ecaf 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala
@@ -43,6 +43,7 @@ import org.apache.texera.amber.operator.dummy.DummyOpDesc
import org.apache.texera.amber.operator.filter.SpecializedFilterOpDesc
import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc
import org.apache.texera.amber.operator.huggingFace.{
+ HuggingFaceInferenceOpDesc,
HuggingFaceIrisLogisticRegressionOpDesc,
HuggingFaceSentimentAnalysisOpDesc,
HuggingFaceSpamSMSDetectionOpDesc,
@@ -396,6 +397,7 @@ trait StateTransferFunc
),
new Type(value = classOf[SklearnDummyClassifierOpDesc], name =
"SklearnDummyClassifier"),
new Type(value = classOf[SklearnPredictionOpDesc], name =
"SklearnPrediction"),
+ new Type(value = classOf[HuggingFaceInferenceOpDesc], name =
"HuggingFace"),
new Type(
value = classOf[HuggingFaceSentimentAnalysisOpDesc],
name = "HuggingFaceSentimentAnalysis"
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
new file mode 100644
index 0000000000..07466c898e
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace
+
+import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription}
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort,
PortIdentity}
+import org.apache.texera.amber.operator.PythonOperatorDescriptor
+import org.apache.texera.amber.operator.huggingFace.codegen.{
+ CodegenContext,
+ PythonCodegenBase,
+ TaskCodegen,
+ TextGenCodegen
+}
+import
org.apache.texera.amber.operator.metadata.annotations.AutofillAttributeName
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+
+/**
+ * Generic Hugging Face inference operator.
+ *
+ * This is the first slice of a feature that will eventually cover ~20 HF
+ * pipeline tasks. PR 2 ships text-generation only; image, audio,
+ * media-generation, and QA task families land in subsequent PRs as new
+ * `TaskCodegen` implementations registered in `registeredCodegens`.
+ *
+ * The Python script that runs at execution time is assembled by
+ * `PythonCodegenBase.render(ctx, codegen)`, which composes the shared
+ * provider-fallback / request-loop infrastructure with the per-task
+ * payload + parse snippets supplied by the selected `TaskCodegen`.
+ *
+ * User-provided string fields are typed as [[EncodableString]] so the
+ * `pyb"..."` macro inside `PythonCodegenBase` emits them as
+ * base64-decoded expressions at runtime instead of raw Python literals —
+ * this is what allows the operator to satisfy
+ * `PythonCodeRawInvalidTextSpec`'s contract that arbitrary `@JsonProperty`
+ * values must not leak into generated source.
+ */
+class HuggingFaceInferenceOpDesc extends PythonOperatorDescriptor {
+
+ @JsonProperty(value = "hfApiToken", required = true)
+ @JsonSchemaTitle("HF API Token")
+ @JsonPropertyDescription(
+ "Your Hugging Face API token (from https://huggingface.co/settings/tokens)"
+ )
+ var hfApiToken: EncodableString = ""
+
+ @JsonProperty(value = "task", required = true, defaultValue =
"text-generation")
+ @JsonSchemaTitle("Task")
+ @JsonPropertyDescription("The Hugging Face pipeline task type")
+ var task: EncodableString = "text-generation"
+
+ @JsonProperty(
+ value = "modelId",
+ required = true,
+ defaultValue = "Qwen/Qwen2.5-72B-Instruct"
+ )
+ @JsonSchemaTitle("Model")
+ @JsonPropertyDescription("Select a Hugging Face model")
+ var modelId: EncodableString = "Qwen/Qwen2.5-72B-Instruct"
+
+ @JsonProperty(value = "promptColumn", required = true)
+ @JsonSchemaTitle("Prompt Column")
+ @JsonPropertyDescription("Column in the input table to use as the user
prompt")
+ @AutofillAttributeName
+ var promptColumn: EncodableString = ""
+
+ @JsonProperty(
+ value = "systemPrompt",
+ required = false,
+ defaultValue = "You are a helpful assistant."
+ )
+ @JsonSchemaTitle("System Prompt")
+ @JsonPropertyDescription("Optional system message to set model behavior")
+ var systemPrompt: EncodableString = "You are a helpful assistant."
+
+ @JsonProperty(value = "maxNewTokens", required = false, defaultValue = "256")
+ @JsonSchemaTitle("Max New Tokens")
+ @JsonPropertyDescription("Maximum number of tokens to generate (1-4096)")
+ var maxNewTokens: java.lang.Integer = 256
+
+ @JsonProperty(value = "temperature", required = false)
+ @JsonSchemaTitle("Temperature")
+ @JsonPropertyDescription("Sampling temperature (0.0 = deterministic, up to
2.0)")
+ var temperature: java.lang.Double = 0.7
+
+ @JsonProperty(
+ value = "resultColumn",
+ required = false,
+ defaultValue = "hf_response"
+ )
+ @JsonSchemaTitle("Result Column Name")
+ @JsonPropertyDescription("Name of the new column added to the output table")
+ var resultColumn: EncodableString = "hf_response"
+
+ /**
+ * Per-task code generators. New entries are added as task families land
+ * in subsequent PRs (e.g. ImageTaskCodegen, AudioTaskCodegen, etc.).
+ *
+ * An unrecognized task string falls back to [[TextGenCodegen]]; the
+ * generated Python's `else` branch then produces a generic `{"inputs":
+ * prompt_value}` payload and the HF endpoint surfaces the real error at
+ * runtime. This matches the original monolithic operator's behavior and
+ * keeps `generatePythonCode` total (it never throws on arbitrary input,
+ * which is required by `PythonCodeRawInvalidTextSpec`).
+ */
+ private val registeredCodegens: Map[String, TaskCodegen] =
+ Map(TextGenCodegen.task -> TextGenCodegen)
+
+ private def codegenForTask(t: String): TaskCodegen =
+ registeredCodegens.getOrElse(t, TextGenCodegen)
+
+ /**
+ * The output column name to use in generated Python and in the output
+ * schema. Falls back to the `"hf_response"` sentinel when the user
+ * leaves the field null or blank.
+ *
+ * Shared between [[generatePythonCode]] and [[getOutputSchemas]] so the
+ * two never drift apart (a divergence would cause the Python operator
+ * to write to a column the schema didn't declare). Returns
+ * [[EncodableString]] rather than `String` so the value flows into the
+ * `pyb` template with the encoding annotation intact.
+ */
+ private def resolvedResultColumn: EncodableString =
+ if (resultColumn == null || resultColumn.trim.isEmpty) "hf_response"
+ else resultColumn
+
+ override def generatePythonCode(): String = {
+ val safeTask: EncodableString =
+ if (task == null || task.trim.isEmpty) "text-generation" else task
+ val safeModelId: EncodableString =
+ if (modelId == null) "" else modelId.trim
+ val safePromptCol: EncodableString =
+ if (promptColumn == null) "" else promptColumn
+ val safeResultCol: EncodableString = resolvedResultColumn
+ val safeSystemPrompt: EncodableString =
+ if (systemPrompt == null) "" else systemPrompt
+ val safeToken: EncodableString =
+ if (hfApiToken == null) "" else hfApiToken
+
+ val safeMaxTokens =
+ math.max(1, math.min(if (maxNewTokens != null) maxNewTokens.intValue
else 256, 4096))
+ val safeTemp =
+ math.max(0.0, math.min(if (temperature != null) temperature.doubleValue
else 0.7, 2.0))
+
+ val ctx = CodegenContext(
+ hfApiToken = safeToken,
+ modelId = safeModelId,
+ promptColumn = safePromptCol,
+ resultColumn = safeResultCol,
+ task = safeTask,
+ systemPrompt = safeSystemPrompt,
+ safeMaxTokens = safeMaxTokens,
+ safeTemp = safeTemp
+ )
+
+ PythonCodegenBase.render(ctx, codegenForTask(safeTask))
+ }
+
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ "Hugging Face",
+ "Call a Hugging Face model via the Inference API",
+ OperatorGroupConstants.HUGGINGFACE_GROUP,
+ inputPorts = List(InputPort()),
+ outputPorts = List(OutputPort())
+ )
+
+ override def getOutputSchemas(
+ inputSchemas: Map[PortIdentity, Schema]
+ ): Map[PortIdentity, Schema] =
+ Map(
+ operatorInfo.outputPorts.head.id -> inputSchemas.values.head
+ .add(resolvedResultColumn, AttributeType.STRING)
+ )
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
new file mode 100644
index 0000000000..16c2cc9bbb
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace.codegen
+
+import
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
+
+/**
+ * Builds the Python script emitted by HuggingFaceInferenceOpDesc.
+ *
+ * The script defines a `ProcessTableOperator` class with:
+ * - Per-instance configuration set in `open(self)` from base64-encoded
+ * values that the `pyb"..."` macro decodes at runtime (so user-input
+ * strings never appear as raw Python literals in the source).
+ * - A provider-fallback system that walks the HF Hub's inference-provider
+ * list cheapest-first and tries each provider's native chat-completions
+ * route, with HF Inference Router as the default.
+ * - A `process_table` loop that validates the prompt column, builds the
+ * per-row payload via the per-task codegen, posts to the resolved
+ * provider, and parses the response.
+ * - A `_parse_response` task switch whose branches are provided by the
+ * per-task codegen.
+ *
+ * Per-task variation lives in `TaskCodegen` implementations. This class
+ * holds only what is shared across all HF tasks; per-task helpers (image
+ * loading, audio MIME inference, media-URL fetching, etc.) will be added
+ * in subsequent PRs as the corresponding task families land.
+ */
+object PythonCodegenBase {
+
+ def render(ctx: CodegenContext, codegen: TaskCodegen): String = {
+ val payload = codegen.payloadPython(ctx)
+ val parse = codegen.parsePython(ctx)
+ val hfApiToken = ctx.hfApiToken
+ val modelId = ctx.modelId
+ val promptColumn = ctx.promptColumn
+ val resultColumn = ctx.resultColumn
+ val task = ctx.task
+ val systemPrompt = ctx.systemPrompt
+ val maxNewTokens = ctx.safeMaxTokens
+ val temperature = ctx.safeTemp
+ pyb"""import os
+ |import re
+ |import json
+ |import requests
+ |import pandas as pd
+ |from pytexera import *
+ |
+ |# Defensive format check for MODEL_ID before it is interpolated into
+ |# HF URL paths. The base host is hardcoded so the worst case isn't
+ |# SSRF, but rejecting `..` segments / query strings / fragments /
+ |# control chars keeps the operator's request shape predictable.
+ |_HF_MODEL_ID_PATTERN =
re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*(/[A-Za-z0-9._-]+)+$$")
+ |
+ |class ProcessTableOperator(UDFTableOperator):
+ |
+ | # Providers ranked cheapest-first (lower index = cheaper).
+ | # Unknown providers are appended at the end.
+ | PROVIDER_COST_PRIORITY = [
+ | "hf-inference",
+ | "cerebras",
+ | "sambanova",
+ | "groq",
+ | "novita",
+ | "nebius",
+ | "fireworks-ai",
+ | "together",
+ | "hyperbolic",
+ | "scaleway",
+ | "nscale",
+ | "ovhcloud",
+ | "deepinfra",
+ | "featherless-ai",
+ | "baseten",
+ | "publicai",
+ | "nvidia",
+ | "openai",
+ | "cohere",
+ | "clarifai",
+ | ]
+ |
+ | # Per-provider chat-completions route overrides. Providers not
listed
+ | # here use the default `v1/chat/completions` path. Single source of
+ | # truth for both _post_with_fallback (text-gen) and _call_provider
+ | # (OpenAI-compatible fallback) so the two stay in sync as providers
+ | # are added.
+ | CHAT_ROUTES = {
+ | "groq": "openai/v1/chat/completions",
+ | "fireworks-ai": "inference/v1/chat/completions",
+ | "cohere": "compatibility/v1/chat/completions",
+ | "clarifai": "v2/ext/openai/v1/chat/completions",
+ | "deepinfra": "v1/openai/chat/completions",
+ | }
+ |
+ | # Third-party providers that speak the OpenAI chat-completions
+ | # protocol. Used by _call_provider's OpenAI-compatible branch.
+ | OPENAI_COMPATIBLE_PROVIDERS = (
+ | "cerebras", "sambanova", "groq", "novita", "nebius",
+ | "fireworks-ai", "together", "hyperbolic", "cohere", "clarifai",
+ | "deepinfra", "featherless-ai", "nscale", "nvidia", "openai",
+ | "ovhcloud", "publicai", "scaleway", "baseten",
+ | )
+ |
+ | def open(self):
+ | # User-provided strings reach the operator via base64-encoded
+ | # decode expressions so they cannot break Python syntax or
+ | # leak raw text into the generated source.
+ | self.HF_API_TOKEN = $hfApiToken
+ | self.MODEL_ID = $modelId
+ | self.PROMPT_COLUMN = $promptColumn
+ | self.RESULT_COLUMN = $resultColumn
+ | self.TASK = $task
+ | self.SYSTEM_PROMPT = $systemPrompt
+ | self.MAX_NEW_TOKENS = $maxNewTokens
+ | self.TEMPERATURE = $temperature
+ |
+ | def _resolve_providers(self, token):
+ | '''Query the HF Hub API for inference providers serving this
model.
+ | Returns a list of dicts with 'name' and 'providerId' sorted
+ | cheapest-first. Falls back to hf-inference if anything goes
wrong.
+ | '''
+ | try:
+ | resp = requests.get(
+ | f"https://huggingface.co/api/models/{self.MODEL_ID}",
+ | headers={"Authorization": f"Bearer {token}"},
+ | params={"expand[]": "inferenceProviderMapping"},
+ | timeout=30,
+ | )
+ | if resp.status_code == 200:
+ | data = resp.json()
+ | mapping = (
+ | data.get("inferenceProviderMapping")
+ | or data.get("inference_provider_mapping")
+ | or {}
+ | )
+ | if mapping:
+ | live = [
+ | {
+ | "name": p,
+ | "providerId": v.get("providerId",
self.MODEL_ID),
+ | "task": v.get("task", ""),
+ | "isModelAuthor": v.get("isModelAuthor",
False),
+ | }
+ | for p, v in mapping.items()
+ | if isinstance(v, dict) and v.get("status") ==
"live"
+ | ]
+ | if live:
+ | priority = {name: idx for idx, name in
enumerate(self.PROVIDER_COST_PRIORITY)}
+ | live.sort(key=lambda prov:
priority.get(prov["name"], len(self.PROVIDER_COST_PRIORITY)))
+ | return live
+ | except Exception:
+ | pass
+ | return [{"name": "hf-inference", "providerId": self.MODEL_ID}]
+ |
+ | def _post_with_fallback(self, providers, json_headers,
pipeline_payload, prompt_value):
+ | '''Try providers in order, using the correct API route for
each.
+ | Returns (response, provider_summary). provider_summary is None
on
+ | success or a string describing what failed.
+ | '''
+ | RETRYABLE = (400, 404, 422, 429, 502, 503)
+ | last_resp = None
+ | errors = []
+ | for prov in providers:
+ | provider_name = prov["name"]
+ | provider_id = prov["providerId"]
+ | try:
+ | if self.TASK == "text-generation":
+ | route = self.CHAT_ROUTES.get(provider_name,
"v1/chat/completions")
+ | url =
f"https://router.huggingface.co/{provider_name}/{route}"
+ | resp = requests.post(url, headers=json_headers,
json=pipeline_payload, timeout=120)
+ | elif provider_name == "hf-inference":
+ | url =
f"https://router.huggingface.co/hf-inference/models/{self.MODEL_ID}"
+ | resp = requests.post(url, headers=json_headers,
json=pipeline_payload, timeout=120)
+ | else:
+ | resp = self._call_provider(provider_name,
provider_id, json_headers, pipeline_payload, prompt_value)
+ | except Exception as e:
+ | errors.append(f"{provider_name}: {type(e).__name__}")
+ | continue
+ | if resp.status_code in (200, 201):
+ | return resp, None
+ | if resp.status_code == 401:
+ | return resp, None
+ | try:
+ | detail = resp.json().get("error", resp.text[:200])
+ | except Exception:
+ | detail = resp.text[:200] if resp.text else "no details"
+ | errors.append(f"{provider_name}: HTTP {resp.status_code} -
{detail}")
+ | last_resp = resp
+ | if resp.status_code not in RETRYABLE:
+ | return resp, "; ".join(errors)
+ | summary = "; ".join(errors) if errors else "no providers
available"
+ | return last_resp, summary
+ |
+ | def _call_provider(self, provider_name, provider_id, json_headers,
pipeline_payload, prompt_value):
+ | '''Route to a third-party provider using its native API format.
+ | For the text-gen-only build this covers the OpenAI-compatible
chat
+ | providers and an unknown-provider fallback that tries the
pipeline
+ | format then chat completions. Image / audio / media routing
will
+ | be added in subsequent PRs alongside the corresponding task
+ | codegens.
+ | '''
+ | base = f"https://router.huggingface.co/{provider_name}"
+ | if provider_name in self.OPENAI_COMPATIBLE_PROVIDERS:
+ | url = f"{base}/{self.CHAT_ROUTES.get(provider_name,
'v1/chat/completions')}"
+ | messages = [{"role": "user", "content": prompt_value}]
+ | return requests.post(
+ | url,
+ | headers=json_headers,
+ | json={"model": provider_id, "messages": messages},
+ | timeout=120,
+ | )
+ |
+ | # Unknown provider: try pipeline format, then chat completions.
+ | url = f"{base}/{provider_id}"
+ | resp = requests.post(url, headers=json_headers,
json=pipeline_payload, timeout=120)
+ | if resp.status_code in (400, 404, 422):
+ | url = f"{base}/v1/chat/completions"
+ | messages = [{"role": "user", "content": prompt_value}]
+ | resp2 = requests.post(
+ | url,
+ | headers=json_headers,
+ | json={"model": provider_id, "messages": messages},
+ | timeout=120,
+ | )
+ | if resp2.status_code == 200:
+ | return resp2
+ | return resp
+ |
+ | @overrides
+ | def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
+ | prompt_col = self.PROMPT_COLUMN
+ | result_col = self.RESULT_COLUMN
+ | task = self.TASK
+ |
+ | # --- validate MODEL_ID format before any HF URL is built ---
+ | if not _HF_MODEL_ID_PATTERN.match(self.MODEL_ID or ""):
+ | raise ValueError(
+ | f"Invalid Hugging Face model ID '{self.MODEL_ID}'. "
+ | f"Expected format like 'org/model-name' or
'org/model-name/revision'."
+ | )
+ |
+ | # --- resolve API token ---
+ | token = self.HF_API_TOKEN if self.HF_API_TOKEN else
os.environ.get("HF_TOKEN", "")
+ | if not token:
+ | raise ValueError(
+ | "Hugging Face API token is not set. "
+ | "Provide it in the operator config or via HF_TOKEN env
var."
+ | )
+ |
+ | # --- resolve all available inference providers for this model
(tried in order) ---
+ | providers = self._resolve_providers(token)
+ |
+ | # --- validate prompt column exists ---
+ | assert prompt_col in table.columns, (
+ | f"Prompt column '{prompt_col}' not found in input table. "
+ | f"Available columns: {list(table.columns)}"
+ | )
+ |
+ | # --- handle empty table ---
+ | if table.empty:
+ | table[result_col] = pd.Series(dtype="object")
+ | yield table
+ | return
+ |
+ | json_headers = {
+ | "Authorization": f"Bearer {token}",
+ | "Content-Type": "application/json",
+ | }
+ |
+ | results = []
+ | for idx, row in table.iterrows():
+ | prompt_value = row[prompt_col]
+ | if pd.isna(prompt_value):
+ | prompt_value = ""
+ | else:
+ | prompt_value = str(prompt_value)
+ |
+ | # --- build task-specific payload (provided by per-task
codegen) ---
+ |${payload}
+ |
+ | try:
+ | resp, provider_summary = self._post_with_fallback(
+ | providers, json_headers, payload, prompt_value
+ | )
+ |
+ | if resp is None:
+ | results.append(
+ | self._format_error(
+ | "All inference providers failed",
+ | f"No provider could serve model
'{self.MODEL_ID}'. "
+ | f"Tried: {provider_summary}"
+ | )
+ | )
+ | continue
+ |
+ | if resp.status_code == 429:
+ | results.append(
+ | self._format_http_error(
+ | "HF API rate limit hit, retry later",
resp.status_code, resp.text
+ | )
+ | )
+ | continue
+ | if resp.status_code == 401:
+ | results.append(
+ | self._format_http_error("Invalid HF API
token", resp.status_code, resp.text)
+ | )
+ | continue
+ | if resp.status_code not in (200, 201):
+ | results.append(
+ | self._format_error(
+ | "All inference providers failed",
+ | f"No provider could serve model
'{self.MODEL_ID}'. "
+ | f"Tried: {provider_summary}"
+ | )
+ | )
+ | continue
+ |
+ | try:
+ | body = resp.json()
+ | except ValueError:
+ | body = resp.text
+ | content = self._parse_response(body)
+ | results.append(content)
+ |
+ | except Exception as e:
+ | import warnings
+ | warnings.warn(
+ | f"Row {idx}: request failed ({type(e).__name__}:
{e}), "
+ | f"setting result to readable error text."
+ | )
+ | results.append(self._format_error("Request failed",
f"{type(e).__name__}: {e}"))
+ |
+ | table[result_col] = results
+ | yield table
+ |
+ | def _format_error(self, title, detail):
+ | return f"{title}: {detail}"
+ |
+ | def _format_http_error(self, title, status_code, response_text):
+ | # Cap at 200 chars to match the truncation in
_post_with_fallback's
+ | # error-detail extraction; a large body / HTML error page would
+ | # otherwise land verbatim in the result cell.
+ | detail = response_text.strip()[:200]
+ | if not detail:
+ | detail = "<empty response>"
+ | return f"{title} [status={status_code}] response={detail}"
+ |
+ | def _parse_response(self, body):
+ | task = self.TASK
+ | try:
+ | if isinstance(body, str):
+ | return body
+ |${parse}
+ | else:
+ | return json.dumps(body)
+ | except (KeyError, IndexError, TypeError):
+ | return json.dumps(body)
+ |""".encode
+ }
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
new file mode 100644
index 0000000000..333d1a038c
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace.codegen
+
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+
+/**
+ * Inputs the dispatcher passes through to each TaskCodegen.
+ *
+ * User-provided string fields are typed as [[EncodableString]] so the
+ * `pyb"..."` macro in [[PythonCodegenBase]] emits them as base64-decoded
+ * runtime expressions rather than raw Python string literals — required to
+ * pass `PythonCodeRawInvalidTextSpec`'s leakage check.
+ */
+final case class CodegenContext(
+ hfApiToken: EncodableString,
+ modelId: EncodableString,
+ promptColumn: EncodableString,
+ resultColumn: EncodableString,
+ task: EncodableString,
+ systemPrompt: EncodableString,
+ safeMaxTokens: Int,
+ safeTemp: Double
+)
+
+/**
+ * A bundle of Python snippets that customize generated inference code for
+ * one Hugging Face pipeline task family.
+ *
+ * Concrete implementations are `object`s registered in
+ * `HuggingFaceInferenceOpDesc.registeredCodegens`. New task families
+ * (image, audio, QA, etc.) land in subsequent PRs by introducing new
+ * `*Codegen` objects and adding them to that map.
+ *
+ * Snippets returned by these methods are Python source spliced into the
+ * shared template assembled by [[PythonCodegenBase.render]]. Snippets must
+ * NOT directly inline user-provided strings — reference the per-instance
+ * attributes `self.HF_API_TOKEN`, `self.MODEL_ID`, `self.PROMPT_COLUMN`,
+ * etc. that the base class initializes from `CodegenContext` via the
+ * `pyb` macro's safe encoding. The snippet author is responsible for the
+ * correct indentation column (see existing implementations).
+ */
+trait TaskCodegen {
+
+ /** Canonical Hugging Face pipeline task string, e.g. "text-generation". */
+ def task: String
+
+ /** Python text that assigns `payload = …` for one row inside
+ * `process_table`'s per-row loop. The snippet supplies its own leading
+ * `if`/`elif task == "...":` opener and any `else` fallback.
+ */
+ def payloadPython(ctx: CodegenContext): String
+
+ /** Python text for the body of `_parse_response`'s task switch. The
+ * snippet supplies its own leading `if`/`elif task == "...":` opener.
+ * The base class wraps the result in the try/except matching the
+ * source layout.
+ */
+ def parsePython(ctx: CodegenContext): String
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TextGenCodegen.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TextGenCodegen.scala
new file mode 100644
index 0000000000..b836de9e12
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TextGenCodegen.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace.codegen
+
+/**
+ * Codegen for the `text-generation` Hugging Face pipeline task.
+ *
+ * The payload is the OpenAI chat-completions shape — `messages` with a
+ * system + user pair plus `max_tokens` / `temperature` knobs — which is
+ * what the HF router and every OpenAI-compatible third-party provider
+ * (Cerebras, Groq, Sambanova, Together, …) accepts.
+ *
+ * The parse step pulls `body["choices"][0]["message"]["content"]` out of
+ * the response.
+ */
+object TextGenCodegen extends TaskCodegen {
+
+ override val task: String = "text-generation"
+
+ override def payloadPython(ctx: CodegenContext): String =
+ """ if task == "text-generation":
+ | payload = {
+ | "model": self.MODEL_ID,
+ | "messages": [
+ | {"role": "system", "content":
self.SYSTEM_PROMPT},
+ | {"role": "user", "content": prompt_value},
+ | ],
+ | "max_tokens": self.MAX_NEW_TOKENS,
+ | "temperature": self.TEMPERATURE,
+ | }
+ | else:
+ | payload = {"inputs": prompt_value}""".stripMargin
+
+ override def parsePython(ctx: CodegenContext): String =
+ """ if task == "text-generation":
+ | return
body["choices"][0]["message"]["content"]""".stripMargin
+}
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
new file mode 100644
index 0000000000..06424df604
--- /dev/null
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace
+
+import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
+import org.apache.texera.amber.core.workflow.PortIdentity
+import org.apache.texera.amber.operator.huggingFace.codegen.{CodegenContext,
TextGenCodegen}
+import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+
+class HuggingFaceInferenceOpDescSpec extends AnyFlatSpec with Matchers {
+
+ private def makeDesc(
+ token: EncodableString = "token",
+ modelId: EncodableString = "Qwen/Qwen2.5-72B-Instruct",
+ promptColumn: EncodableString = "prompt",
+ task: EncodableString = "text-generation",
+ systemPrompt: EncodableString = "You are a helpful assistant.",
+ maxNewTokens: Int = 256,
+ temperature: Double = 0.7,
+ resultColumn: EncodableString = "hf_response"
+ ): HuggingFaceInferenceOpDesc = {
+ val desc = new HuggingFaceInferenceOpDesc()
+ desc.hfApiToken = token
+ desc.modelId = modelId
+ desc.promptColumn = promptColumn
+ desc.task = task
+ desc.systemPrompt = systemPrompt
+ desc.maxNewTokens = maxNewTokens
+ desc.temperature = temperature
+ desc.resultColumn = resultColumn
+ desc
+ }
+
+ "HuggingFaceInferenceOpDesc.operatorInfo" should
+ "advertise the user-friendly name, HuggingFace group, and one input/output
port" in {
+ val info = (new HuggingFaceInferenceOpDesc).operatorInfo
+ info.userFriendlyName shouldBe "Hugging Face"
+ info.operatorGroupName shouldBe OperatorGroupConstants.HUGGINGFACE_GROUP
+ info.inputPorts.size shouldBe 1
+ info.outputPorts.size shouldBe 1
+ }
+
+ "generatePythonCode" should
+ "fall back to the text-gen codegen on an unrecognized task (HF reports the
real error at runtime)" in {
+ // generatePythonCode must be total — never throw on arbitrary
@JsonProperty
+ // values — per the PythonCodeRawInvalidTextSpec contract. An unknown task
+ // routes through TextGenCodegen, whose payload `if/else` hits the generic
+ // `{"inputs": prompt_value}` branch at runtime.
+ val code = makeDesc(task = "not-a-real-task").generatePythonCode()
+ code should include("""payload = {"inputs": prompt_value}""")
+ }
+
+ it should "emit a ProcessTableOperator that initializes config in open()" in
{
+ val code = makeDesc().generatePythonCode()
+ code should include("class ProcessTableOperator(UDFTableOperator):")
+ code should include("def open(self):")
+ // User-input strings are decoded at runtime, not embedded as literals.
+ code should include("self.HF_API_TOKEN = self.decode_python_template(")
+ code should include("self.MODEL_ID = self.decode_python_template(")
+ code should include("self.PROMPT_COLUMN = self.decode_python_template(")
+ code should include("self.TASK = self.decode_python_template(")
+ code should include("self.SYSTEM_PROMPT = self.decode_python_template(")
+ }
+
+ it should "wire the text-gen payload and response parse correctly" in {
+ val code = makeDesc().generatePythonCode()
+ // Payload — chat-completions shape against the configured model + system
prompt.
+ code should include("self.MODEL_ID")
+ code should include("self.SYSTEM_PROMPT")
+ code should include("self.MAX_NEW_TOKENS")
+ code should include("self.TEMPERATURE")
+ // Parse — text-gen pulls choices[0].message.content out of the response.
+ code should include("""body["choices"][0]["message"]["content"]""")
+ }
+
+ it should
+ "emit a runtime check that rejects malformed MODEL_ID values before any HF
URL is built" in {
+ val code = makeDesc().generatePythonCode()
+ // Pattern that fences MODEL_ID to org/model-name (allowing
org/model-name/revision).
+ code should include("_HF_MODEL_ID_PATTERN = re.compile(")
+ // Runtime fail-fast inside process_table — happens before
_resolve_providers
+ // composes the URL, so a malformed value never escapes into a request.
+ code should include("if not _HF_MODEL_ID_PATTERN.match(")
+ code should include("raise ValueError(")
+ code should include("Invalid Hugging Face model ID")
+ }
+
+ it should "not leak raw user-input strings into the generated Python source"
in {
+ // Sentinel value chosen to be distinctive and non-overlapping with
anything
+ // else in the template. If our encoding regressed back to raw literals
+ // (e.g. `MODEL_ID = "MARKER_zXyq42"`), this assertion would fail.
+ val marker = "MARKER_zXyq42"
+ val code =
+ makeDesc(modelId = marker, promptColumn = marker, token =
marker).generatePythonCode()
+ code should not include marker
+ }
+
+ it should "clamp maxNewTokens into the 1-4096 range" in {
+ makeDesc(maxNewTokens = -5).generatePythonCode() should include(
+ "self.MAX_NEW_TOKENS = 1"
+ )
+ makeDesc(maxNewTokens = 99999).generatePythonCode() should include(
+ "self.MAX_NEW_TOKENS = 4096"
+ )
+ }
+
+ it should "clamp temperature into the 0.0-2.0 range" in {
+ makeDesc(temperature = -1.0).generatePythonCode() should include(
+ "self.TEMPERATURE = 0.0"
+ )
+ makeDesc(temperature = 5.0).generatePythonCode() should include(
+ "self.TEMPERATURE = 2.0"
+ )
+ }
+
+ it should "tolerate null @JsonProperty values and fall back to safe
defaults" in {
+ // Every user-input field can land as null when the JSON deserializer is
+ // handed a workflow that omits the field. generatePythonCode must not
+ // throw on any combination — and the generated Python must still parse.
+ val desc = new HuggingFaceInferenceOpDesc()
+ desc.hfApiToken = null
+ desc.modelId = null
+ desc.promptColumn = null
+ desc.systemPrompt = null
+ desc.resultColumn = null
+ desc.task = null
+ desc.maxNewTokens = null
+ desc.temperature = null
+ val code = desc.generatePythonCode()
+ code should include("class ProcessTableOperator(UDFTableOperator):")
+ code should include("def open(self):")
+ // System-prompt default is the empty-string sentinel (no fallback string
+ // injected) but the operator class still initializes the constant.
+ code should include("self.SYSTEM_PROMPT = ")
+ // maxNewTokens null path defaults to 256.
+ code should include("self.MAX_NEW_TOKENS = 256")
+ // temperature null path defaults to 0.7.
+ code should include("self.TEMPERATURE = 0.7")
+ }
+
+ "TextGenCodegen" should "advertise text-generation as its canonical task" in
{
+ TextGenCodegen.task shouldBe "text-generation"
+ }
+
+ it should
+ "emit payload and parse snippets that don't depend on the CodegenContext"
in {
+ // For text-generation, the codegen's only inputs to Python are static
+ // strings referencing self.* attributes — exercising both methods
+ // confirms they don't accidentally consume ctx fields (a future
+ // refactor regression would surface here).
+ val ctx = CodegenContext(
+ hfApiToken = "irrelevant",
+ modelId = "irrelevant",
+ promptColumn = "irrelevant",
+ resultColumn = "irrelevant",
+ task = "irrelevant",
+ systemPrompt = "irrelevant",
+ safeMaxTokens = 0,
+ safeTemp = 0.0
+ )
+ TextGenCodegen.payloadPython(ctx) should include("self.MODEL_ID")
+ TextGenCodegen.parsePython(ctx) should
include("""body["choices"][0]["message"]["content"]""")
+ }
+
+ "getOutputSchemas" should "add the result column as a STRING to the
inherited schema" in {
+ val desc = makeDesc(resultColumn = "answer")
+ val inputSchema = Schema().add("prompt", AttributeType.STRING)
+ val out = desc.getOutputSchemas(Map(PortIdentity(0) -> inputSchema))
+ val outSchema = out(desc.operatorInfo.outputPorts.head.id)
+ outSchema.getAttributeNames.contains("prompt") shouldBe true
+ outSchema.getAttributeNames.contains("answer") shouldBe true
+ outSchema.getAttribute("answer").getType shouldBe AttributeType.STRING
+ }
+
+ it should "fall back to the default 'hf_response' name when resultColumn is
empty" in {
+ val desc = makeDesc(resultColumn = "")
+ val inputSchema = Schema().add("prompt", AttributeType.STRING)
+ val out = desc.getOutputSchemas(Map(PortIdentity(0) -> inputSchema))
+ val outSchema = out(desc.operatorInfo.outputPorts.head.id)
+ outSchema.getAttributeNames.contains("hf_response") shouldBe true
+ }
+}