This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 439ea72e46 feat(huggingface): add audio and media generation tasks
(#5570)
439ea72e46 is described below
commit 439ea72e46b78aec7f71e8889225f2c90942a2c2
Author: Anish Shivamurthy <[email protected]>
AuthorDate: Mon Jun 22 15:10:55 2026 -0700
feat(huggingface): add audio and media generation tasks (#5570)
## What changes were proposed in this PR?
Adds the audio and media-generation task families — 5 HF pipeline tasks
— as new `TaskCodegen`s plugged into the dispatcher established by the
text-generation PR:
audio tasks: `automatic-speech-recognition`, `audio-classification`,
`text-to-speech`
media-generation tasks: `text-to-image`, `text-to-video`
`codegen/AudioTaskCodegen.scala` supplies the per-task payload + parse
Python branches for the 3 audio tasks.
`codegen/MediaGenCodegen.scala` supplies the per-task payload + parse
Python branches for the 2 media-generation tasks.
`CodegenContext` is extended with `audioInput` + `inputAudioColumn`
(`EncodableString`).
`HuggingFaceInferenceOpDesc.scala` gains 2 new `@JsonProperty` fields
and registers `AudioTaskCodegen` + `MediaGenCodegen` in the dispatcher.
`PythonCodegenBase.scala` grows to host the shared audio/media
infrastructure:
- Audio task-family tuple (`audio_only_tasks`) in `process_table`.
- Per-row audio-byte resolution from upload or column input.
- Raw binary request handling for `automatic-speech-recognition` and
`audio-classification`.
- JSON payload handling for `text-to-speech`.
- Provider-specific routing for media generation and audio generation
through `_call_provider`, including OpenAI-compatible image/audio
endpoints where supported.
- Response parsing for audio/media outputs, including data-URL
conversion for generated media URLs.
- Media helper support for converting remote URLs into `data:image/...`,
`data:audio/...`, or `data:video/...` URLs where needed.
- Hardened audio input loading to match the image-input path: uploaded
audio is accepted as a data URL, remote audio is fetched through the
existing HTTPS-only `_fetch_remote_url` helper, and arbitrary
worker-local file paths are no longer read.
User-input strings continue to flow through `pyb"..."` +
`EncodableString` so they reach Python as
`self.decode_python_template('<base64>')` rather than raw literals.
`PythonCodeRawInvalidTextSpec` still passes with 117/117 descriptors
py_compile cleanly.
## Any related issues, documentation, or discussions?
Tracking issue: Add audio and media-generation task families to
HuggingFace operator apache#5288
Closes apache#5288
Stacked on: Add image task family (`ImageTaskCodegen`) to HuggingFace
operator / `hf/03-image-tasks`
Parent issue: Add Hugging Face inference operator apache#5041
Closed sibling issue: Add HuggingFaceModelResource REST endpoints for HF
operator UI apache#5134
## How was this PR tested?
`sbt "WorkflowOperator/compile; WorkflowOperator/Test/compile"` clean.
`sbt scalafmtCheck` clean.
`sbt "WorkflowOperator/testOnly
org.apache.texera.amber.operator.huggingFace.HuggingFaceInferenceOpDescSpec
org.apache.texera.amber.util.PythonCodeRawInvalidTextSpec"` — 26 focused
tests pass, including HuggingFace audio/media task coverage and the raw
Python descriptor scan.
`sbt "WorkflowOperator/testOnly
org.apache.texera.amber.util.PythonCodeRawInvalidTextSpec"` — 117/117
descriptors py_compile cleanly with the new operator code paths, no
marker leaks.
- Added regression coverage that audio remote input routes through
`_fetch_remote_url(audio_input)` and no longer uses raw
`requests.get(audio_input)` or local file reads.
## Was this PR authored or co-authored using generative AI tooling?
Yes, co-authored with generative AI tooling (Codex).
---
.../huggingFace/HuggingFaceInferenceOpDesc.scala | 23 ++++-
.../huggingFace/codegen/AudioTaskCodegen.scala | 79 +++++++++++++++
.../huggingFace/codegen/MediaGenCodegen.scala | 78 +++++++++++++++
.../huggingFace/codegen/PythonCodegenBase.scala | 110 +++++++++++++++++++--
.../operator/huggingFace/codegen/TaskCodegen.scala | 4 +-
.../HuggingFaceInferenceOpDescSpec.scala | 101 ++++++++++++++++++-
6 files changed, 384 insertions(+), 11 deletions(-)
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
index 5f203717d1..f7805266cf 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDesc.scala
@@ -25,8 +25,10 @@ import org.apache.texera.amber.core.tuple.{AttributeType,
Schema}
import org.apache.texera.amber.core.workflow.{InputPort, OutputPort,
PortIdentity}
import org.apache.texera.amber.operator.PythonOperatorDescriptor
import org.apache.texera.amber.operator.huggingFace.codegen.{
+ AudioTaskCodegen,
CodegenContext,
ImageTaskCodegen,
+ MediaGenCodegen,
PythonCodegenBase,
TaskCodegen,
TextGenCodegen
@@ -95,6 +97,17 @@ class HuggingFaceInferenceOpDesc extends
PythonOperatorDescriptor {
@AutofillAttributeName
var inputImageColumn: EncodableString = ""
+ @JsonProperty(value = "audioInput", required = false)
+ @JsonSchemaTitle("Audio Upload")
+ @JsonPropertyDescription("Upload audio for Hugging Face audio tasks")
+ var audioInput: EncodableString = ""
+
+ @JsonProperty(value = "inputAudioColumn", required = false)
+ @JsonSchemaTitle("Input Audio Column")
+ @JsonPropertyDescription("Column containing audio data from the input table")
+ @AutofillAttributeName
+ var inputAudioColumn: EncodableString = ""
+
@JsonProperty(
value = "systemPrompt",
required = false,
@@ -138,6 +151,8 @@ class HuggingFaceInferenceOpDesc extends
PythonOperatorDescriptor {
val byTask = scala.collection.mutable.Map.empty[String, TaskCodegen]
byTask += (TextGenCodegen.task -> TextGenCodegen)
ImageTaskCodegen.tasks.foreach(t => byTask += (t -> ImageTaskCodegen))
+ AudioTaskCodegen.tasks.foreach(t => byTask += (t -> AudioTaskCodegen))
+ MediaGenCodegen.tasks.foreach(t => byTask += (t -> MediaGenCodegen))
byTask.toMap
}
@@ -181,6 +196,10 @@ class HuggingFaceInferenceOpDesc extends
PythonOperatorDescriptor {
if (imageInput == null) "" else imageInput
val safeInputImageColumn: EncodableString =
if (inputImageColumn == null) "" else inputImageColumn
+ val safeAudioInput: EncodableString =
+ if (audioInput == null) "" else audioInput
+ val safeInputAudioColumn: EncodableString =
+ if (inputAudioColumn == null) "" else inputAudioColumn
val ctx = CodegenContext(
hfApiToken = safeToken,
@@ -192,7 +211,9 @@ class HuggingFaceInferenceOpDesc extends
PythonOperatorDescriptor {
safeMaxTokens = safeMaxTokens,
safeTemp = safeTemp,
imageInput = safeImageInput,
- inputImageColumn = safeInputImageColumn
+ inputImageColumn = safeInputImageColumn,
+ audioInput = safeAudioInput,
+ inputAudioColumn = safeInputAudioColumn
)
PythonCodegenBase.render(ctx, codegenForTask(safeTask))
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/AudioTaskCodegen.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/AudioTaskCodegen.scala
new file mode 100644
index 0000000000..560244962a
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/AudioTaskCodegen.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace.codegen
+
+/**
+ * Codegen for Hugging Face audio task families.
+ *
+ * ASR and audio-classification send audio bytes as the raw request body.
+ * Text-to-speech is prompt-driven and sends a JSON payload; its providers
+ * return either audio bytes directly or a JSON envelope pointing to audio.
+ */
+object AudioTaskCodegen extends TaskCodegen {
+
+ override val task: String = "automatic-speech-recognition"
+
+ override val tasks: Set[String] = Set(
+ "automatic-speech-recognition",
+ "audio-classification",
+ "text-to-speech"
+ )
+
+ override def payloadPython(ctx: CodegenContext): String =
+ """ if task in audio_only_tasks:
+ | payload = current_audio_bytes
+ | use_raw_binary_body = True
+ | raw_binary_headers = audio_headers
+ | elif task == "text-to-speech":
+ | payload = {"inputs": prompt_value}""".stripMargin
+
+ override def parsePython(ctx: CodegenContext): String =
+ """ if task == "text-to-speech":
+ | if isinstance(body, dict):
+ | if "output" in body:
+ | out = body["output"]
+ | url = out[0] if isinstance(out, list) else out
+ | if isinstance(url, str) and
url.startswith("http"):
+ | return self._url_to_data_url(url)
+ | if "audio" in body:
+ | audio = body["audio"]
+ | if isinstance(audio, dict):
+ | if "url" in audio:
+ | return
self._url_to_data_url(audio["url"])
+ | if "b64_json" in audio:
+ | return
f"data:audio/mpeg;base64,{audio['b64_json']}"
+ | if "data" in body:
+ | data = body["data"]
+ | if data and isinstance(data[0], dict):
+ | if "url" in data[0]:
+ | return
self._url_to_data_url(data[0]["url"])
+ | if "b64_json" in data[0]:
+ | return
f"data:audio/mpeg;base64,{data[0]['b64_json']}"
+ | return json.dumps(body)
+ | elif task == "automatic-speech-recognition":
+ | if isinstance(body, dict):
+ | if "text" in body:
+ | return body["text"]
+ | if "generated_text" in body:
+ | return body["generated_text"]
+ | return json.dumps(body)
+ | elif task == "audio-classification":
+ | return json.dumps(body)""".stripMargin
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/MediaGenCodegen.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/MediaGenCodegen.scala
new file mode 100644
index 0000000000..73047da89c
--- /dev/null
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/MediaGenCodegen.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.huggingFace.codegen
+
+/**
+ * Codegen for prompt-driven media generation tasks.
+ *
+ * Providers return media in several shapes: raw bytes, OpenAI-style
+ * b64_json, or URLs. URL responses are normalized to data URLs by the
+ * shared `_url_to_data_url` helper so downstream result rendering receives
+ * a stable string format.
+ */
+object MediaGenCodegen extends TaskCodegen {
+
+ override val task: String = "text-to-image"
+
+ override val tasks: Set[String] = Set(
+ "text-to-image",
+ "text-to-video"
+ )
+
+ override def payloadPython(ctx: CodegenContext): String =
+ """ payload = {"inputs": prompt_value}""".stripMargin
+
+ override def parsePython(ctx: CodegenContext): String =
+ """ if task == "text-to-image":
+ | if isinstance(body, dict):
+ | if "output" in body:
+ | out = body["output"]
+ | url = out[0] if isinstance(out, list) else out
+ | if isinstance(url, str) and
url.startswith("http"):
+ | return self._url_to_data_url(url)
+ | if "images" in body:
+ | images = body["images"]
+ | if images and isinstance(images[0], dict) and
"url" in images[0]:
+ | return
self._url_to_data_url(images[0]["url"])
+ | if "data" in body:
+ | data = body["data"]
+ | if isinstance(data, dict) and "outputs" in data:
+ | outputs = data["outputs"]
+ | if outputs and isinstance(outputs[0], str)
and outputs[0].startswith("http"):
+ | return self._url_to_data_url(outputs[0])
+ | if isinstance(data, list) and data and
isinstance(data[0], dict):
+ | if "b64_json" in data[0]:
+ | return
f"data:image/png;base64,{data[0]['b64_json']}"
+ | if "url" in data[0]:
+ | return
self._url_to_data_url(data[0]["url"])
+ | return json.dumps(body)
+ | elif task == "text-to-video":
+ | if isinstance(body, dict):
+ | if "output" in body:
+ | out = body["output"]
+ | url = out[0] if isinstance(out, list) else out
+ | if isinstance(url, str) and
url.startswith("http"):
+ | return self._url_to_data_url(url)
+ | if "video" in body:
+ | video = body["video"]
+ | if isinstance(video, dict) and "url" in video:
+ | return self._url_to_data_url(video["url"])
+ | return json.dumps(body)""".stripMargin
+}
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
index eac4641c62..8671b9a76a 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/PythonCodegenBase.scala
@@ -57,6 +57,8 @@ object PythonCodegenBase {
val temperature = ctx.safeTemp
val imageInput = ctx.imageInput
val inputImageColumn = ctx.inputImageColumn
+ val audioInput = ctx.audioInput
+ val inputAudioColumn = ctx.inputAudioColumn
pyb"""import os
|import re
|import json
@@ -137,6 +139,8 @@ object PythonCodegenBase {
| self.TEMPERATURE = $temperature
| self.IMAGE_INPUT = $imageInput
| self.INPUT_IMAGE_COLUMN = $inputImageColumn
+ | self.AUDIO_INPUT = $audioInput
+ | self.INPUT_AUDIO_COLUMN = $inputAudioColumn
|
| def _resolve_providers(self, token):
| '''Query the HF Hub API for inference providers serving this
model.
@@ -286,7 +290,14 @@ object PythonCodegenBase {
| if provider_name == "replicate":
| url = f"{base}/v1/models/{provider_id}/predictions"
| hdrs = {**json_headers, "Prefer": "wait"}
- | if task == "image-to-image" and img_b64:
+ | if task == "text-to-speech":
+ | inp = {"text": prompt_value}
+ | elif task in ("text-to-image", "text-to-video"):
+ | inp = {"prompt": prompt_value}
+ | elif task in ("automatic-speech-recognition",
"audio-classification") and img_b64:
+ | audio_content_type =
raw_binary_headers.get("Content-Type", "audio/mpeg")
+ | inp = {"audio":
f"data:{audio_content_type};base64,{img_b64}"}
+ | elif task == "image-to-image" and img_b64:
| data_url = f"data:image/png;base64,{img_b64}"
| inp = {"image": data_url, "images": [data_url],
"input_image": data_url, "prompt": prompt_value}
| elif img_b64:
@@ -340,6 +351,10 @@ object PythonCodegenBase {
| # Fal-ai: per-model endpoint.
| if provider_name == "fal-ai":
| url = f"{base}/{provider_id}"
+ | if task == "text-to-speech":
+ | return requests.post(url, headers=json_headers,
json={"text": prompt_value}, timeout=120)
+ | if task in ("text-to-image", "text-to-video"):
+ | return requests.post(url, headers=json_headers,
json={"prompt": prompt_value}, timeout=120)
| if task == "image-to-image" and img_b64:
| data_url = f"data:image/png;base64,{img_b64}"
| return requests.post(url, headers=json_headers,
json={"image_url": data_url, "image_urls": [data_url], "prompt": prompt_value},
timeout=120)
@@ -398,6 +413,12 @@ object PythonCodegenBase {
| return poll_resp
|
| if provider_name in self.OPENAI_COMPATIBLE_PROVIDERS:
+ | if task == "text-to-image":
+ | url = f"{base}/v1/images/generations"
+ | return requests.post(url, headers=json_headers,
json={"model": provider_id, "prompt": prompt_value}, timeout=120)
+ | if task == "text-to-speech":
+ | url = f"{base}/v1/audio/speech"
+ | return requests.post(url, headers=json_headers,
json={"model": provider_id, "input": prompt_value}, timeout=120)
| url = f"{base}/{self.CHAT_ROUTES.get(provider_name,
'v1/chat/completions')}"
| messages = [{"role": "user", "content": prompt_value}]
| if img_b64:
@@ -444,6 +465,7 @@ object PythonCodegenBase {
| image_only_tasks = ("image-classification",
"object-detection", "image-segmentation", "image-to-text")
| image_prompt_tasks = ("visual-question-answering",
"document-question-answering", "zero-shot-image-classification",
"image-text-to-text", "image-to-image")
| image_tasks = image_only_tasks + image_prompt_tasks
+ | audio_only_tasks = ("automatic-speech-recognition",
"audio-classification")
|
| # --- validate MODEL_ID format before any HF URL is built ---
| if not _HF_MODEL_ID_PATTERN.match(self.MODEL_ID or ""):
@@ -463,8 +485,8 @@ object PythonCodegenBase {
| # --- resolve all available inference providers for this model
(tried in order) ---
| providers = self._resolve_providers(token)
|
- | # --- validate prompt column exists (required for non-image
tasks) ---
- | if task not in image_tasks:
+ | # --- validate prompt column exists (skipped for image tasks
and binary-only audio tasks) ---
+ | if task not in image_tasks and task not in audio_only_tasks:
| assert prompt_col in table.columns, (
| f"Prompt column '{prompt_col}' not found in input
table. "
| f"Available columns: {list(table.columns)}"
@@ -484,12 +506,19 @@ object PythonCodegenBase {
| "Authorization": f"Bearer {token}",
| "Content-Type": "application/octet-stream",
| }
- |
| # --- resolve image source (upload or column) for image tasks
---
| has_image_upload = bool(self.IMAGE_INPUT) and
bool(str(self.IMAGE_INPUT).strip())
| use_image_column = not has_image_upload and
bool(self.INPUT_IMAGE_COLUMN) and self.INPUT_IMAGE_COLUMN in table.columns
| image_bytes = None
| image_error = None
+ | has_audio_upload = bool(self.AUDIO_INPUT) and
bool(str(self.AUDIO_INPUT).strip())
+ | use_audio_column = not has_audio_upload and
bool(self.INPUT_AUDIO_COLUMN) and self.INPUT_AUDIO_COLUMN in table.columns
+ | audio_headers = {
+ | "Authorization": f"Bearer {token}",
+ | "Content-Type": "application/octet-stream" if
use_audio_column else self._get_audio_content_type(),
+ | }
+ | audio_bytes = None
+ | audio_error = None
| if task in image_tasks and not use_image_column:
| if not has_image_upload:
| image_error = "No image source. Set an Input Image
Column or upload an image."
@@ -498,15 +527,28 @@ object PythonCodegenBase {
| image_bytes = self._read_image_input()
| except Exception as e:
| image_error = f"Could not read image input
({type(e).__name__}: {e})"
+ | if task in audio_only_tasks and not use_audio_column:
+ | if not has_audio_upload:
+ | audio_error = "No audio source. Set an Input Audio
Column or upload audio."
+ | else:
+ | try:
+ | audio_bytes = self._read_audio_input()
+ | except Exception as e:
+ | audio_error = f"Could not read audio input
({type(e).__name__}: {e})"
|
| results = []
| for idx, row in table.iterrows():
| if image_error is not None:
| results.append(self._format_error("Image task
configuration error", image_error))
| continue
+ | if audio_error is not None:
+ | results.append(self._format_error("Audio task
configuration error", audio_error))
+ | continue
|
| if task in image_only_tasks:
| prompt_value = ""
+ | elif task in audio_only_tasks:
+ | prompt_value = ""
| elif task in image_prompt_tasks and prompt_col not in
table.columns:
| prompt_value = "What is shown in this image?"
| else:
@@ -529,6 +571,18 @@ object PythonCodegenBase {
| results.append(self._format_error("Image data
error", f"Row {idx}: {type(e).__name__}: {e}"))
| continue
|
+ | # --- resolve per-row audio bytes from column ---
+ | current_audio_bytes = audio_bytes
+ | if task in audio_only_tasks and use_audio_column:
+ | try:
+ | current_audio_bytes =
self._read_binary_value(row[self.INPUT_AUDIO_COLUMN])
+ | if current_audio_bytes is None:
+ | results.append(self._format_error("Audio data
error", f"Row {idx}: audio column is empty"))
+ | continue
+ | except Exception as e:
+ | results.append(self._format_error("Audio data
error", f"Row {idx}: {type(e).__name__}: {e}"))
+ | continue
+ |
| # --- build task-specific payload (provided by per-task
codegen) ---
| use_raw_binary_body = False
| raw_binary_headers = image_headers
@@ -576,6 +630,10 @@ object PythonCodegenBase {
| b64 =
base64.b64encode(resp.content).decode("utf-8")
| results.append(f"data:{content_type};base64,{b64}")
| continue
+ | if content_type.startswith("audio/") or
content_type.startswith("video/"):
+ | b64 =
base64.b64encode(resp.content).decode("utf-8")
+ | results.append(f"data:{content_type};base64,{b64}")
+ | continue
|
| try:
| body = resp.json()
@@ -702,6 +760,22 @@ object PythonCodegenBase {
| def _image_input_as_base64(self, image_bytes):
| return base64.b64encode(image_bytes).decode("utf-8")
|
+ | def _read_audio_input(self):
+ | audio_input = str(self.AUDIO_INPUT or "").strip()
+ | if audio_input.startswith("data:"):
+ | _, encoded = audio_input.split(",", 1)
+ | return base64.b64decode(encoded)
+ | if audio_input.startswith("http://") or
audio_input.startswith("https://"):
+ | _, data = self._fetch_remote_url(audio_input)
+ | return data
+ | # Reading arbitrary worker-filesystem paths is intentionally
NOT
+ | # supported: uploaded audio arrives as a data URL and remote
audio
+ | # must be fetched through the hardened https-only helper above.
+ | raise ValueError(
+ | "Unsupported audio input. Upload an audio file (sent as a
data URL) "
+ | "or provide a public https audio URL."
+ | )
+ |
| def _read_binary_value(self, value):
| if value is None:
| return None
@@ -821,6 +895,30 @@ object PythonCodegenBase {
| return text[start_pos:pos], pos
| return None, start_pos
|
+ | def _get_audio_content_type(self):
+ | audio_input = str(self.AUDIO_INPUT or "").strip().lower()
+ | if audio_input.startswith("data:"):
+ | header = audio_input.split(",", 1)[0]
+ | if ";" in header:
+ | return header[5:header.index(";")]
+ | return header[5:]
+ | extension_map = {
+ | ".mp3": "audio/mpeg",
+ | ".mpeg": "audio/mpeg",
+ | ".wav": "audio/wav",
+ | ".flac": "audio/flac",
+ | ".ogg": "audio/ogg",
+ | ".oga": "audio/ogg",
+ | ".webm": "audio/webm",
+ | ".opus": "audio/webm;codecs=opus",
+ | ".amr": "audio/amr",
+ | ".m4a": "audio/m4a",
+ | }
+ | from urllib.parse import urlparse as _urlparse
+ | path = _urlparse(audio_input).path if
audio_input.startswith("http") else audio_input
+ | _, ext = os.path.splitext(path)
+ | return extension_map.get(ext, "audio/mpeg")
+ |
| def _url_to_data_url(self, url):
| '''Fetch a URL and return a data URL with the correct MIME
type.
| Fetched via _fetch_remote_url so a malicious/compromised
provider
@@ -831,12 +929,12 @@ object PythonCodegenBase {
| if not content_type or content_type ==
"application/octet-stream":
| from urllib.parse import urlparse as _urlparse
| ext = os.path.splitext(_urlparse(url).path.lower())[1]
- | mime_map = {".png": "image/png", ".jpg": "image/jpeg",
".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", ".svg":
"image/svg+xml", ".mp4": "video/mp4", ".webm": "video/webm"}
+ | mime_map = {".png": "image/png", ".jpg": "image/jpeg",
".jpeg": "image/jpeg", ".gif": "image/gif", ".webp": "image/webp", ".svg":
"image/svg+xml", ".mp3": "audio/mpeg", ".mpeg": "audio/mpeg", ".wav":
"audio/wav", ".flac": "audio/flac", ".ogg": "audio/ogg", ".oga": "audio/ogg",
".m4a": "audio/m4a", ".mp4": "video/mp4", ".webm": "video/webm"}
| guessed = mime_map.get(ext, "")
| if guessed:
| content_type = guessed
| else:
- | task_mime = {"image-to-image": "image/png"}
+ | task_mime = {"image-to-image": "image/png",
"text-to-image": "image/png", "text-to-video": "video/mp4", "text-to-speech":
"audio/mpeg"}
| content_type = task_mime.get(self.TASK,
"application/octet-stream")
| b64 = base64.b64encode(data).decode("utf-8")
| return f"data:{content_type};base64,{b64}"
diff --git
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
index 299ea5d6e3..80bbcc58fc 100644
---
a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
+++
b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/huggingFace/codegen/TaskCodegen.scala
@@ -39,7 +39,9 @@ final case class CodegenContext(
safeMaxTokens: Int,
safeTemp: Double,
imageInput: EncodableString = "",
- inputImageColumn: EncodableString = ""
+ inputImageColumn: EncodableString = "",
+ audioInput: EncodableString = "",
+ inputAudioColumn: EncodableString = ""
)
/**
diff --git
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
index 0d6e09302f..eb728945f3 100644
---
a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
+++
b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/huggingFace/HuggingFaceInferenceOpDescSpec.scala
@@ -21,7 +21,12 @@ package org.apache.texera.amber.operator.huggingFace
import org.apache.texera.amber.core.tuple.{AttributeType, Schema}
import org.apache.texera.amber.core.workflow.PortIdentity
-import org.apache.texera.amber.operator.huggingFace.codegen.{CodegenContext,
TextGenCodegen}
+import org.apache.texera.amber.operator.huggingFace.codegen.{
+ AudioTaskCodegen,
+ CodegenContext,
+ MediaGenCodegen,
+ TextGenCodegen
+}
import org.apache.texera.amber.operator.metadata.OperatorGroupConstants
import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
import org.scalatest.flatspec.AnyFlatSpec
@@ -39,7 +44,9 @@ class HuggingFaceInferenceOpDescSpec extends AnyFlatSpec with
Matchers {
temperature: Double = 0.7,
resultColumn: EncodableString = "hf_response",
imageInput: EncodableString = "",
- inputImageColumn: EncodableString = ""
+ inputImageColumn: EncodableString = "",
+ audioInput: EncodableString = "",
+ inputAudioColumn: EncodableString = ""
): HuggingFaceInferenceOpDesc = {
val desc = new HuggingFaceInferenceOpDesc()
desc.hfApiToken = token
@@ -52,6 +59,8 @@ class HuggingFaceInferenceOpDescSpec extends AnyFlatSpec with
Matchers {
desc.resultColumn = resultColumn
desc.imageInput = imageInput
desc.inputImageColumn = inputImageColumn
+ desc.audioInput = audioInput
+ desc.inputAudioColumn = inputAudioColumn
desc
}
@@ -152,6 +161,8 @@ class HuggingFaceInferenceOpDescSpec extends AnyFlatSpec
with Matchers {
desc.temperature = null
desc.imageInput = null
desc.inputImageColumn = null
+ desc.audioInput = null
+ desc.inputAudioColumn = null
val code = desc.generatePythonCode()
code should include("class ProcessTableOperator(UDFTableOperator):")
code should include("def open(self):")
@@ -272,10 +283,15 @@ class HuggingFaceInferenceOpDescSpec extends AnyFlatSpec
with Matchers {
// size cap
code should include("MAX_REMOTE_FETCH_BYTES")
code should include("Remote file exceeds the")
- // all three fetch sites route through the helper (no raw requests.get on
these URLs)
+ // all remote fetch sites route through the helper (no raw requests.get on
these URLs)
code should include("_, data = self._fetch_remote_url(image_input)")
+ code should include("_, data = self._fetch_remote_url(audio_input)")
code should include("_, data = self._fetch_remote_url(val)")
code should include("raw_content_type, data = self._fetch_remote_url(url)")
+ code should not include "def _audio_url_to_data_url"
+ code should not include "requests.get(audio_input"
+ code should not include "os.path.exists(audio_input)"
+ code should not include "open(audio_input"
}
it should "treat pandas NA sentinels (NaN, pd.NA, NaT) as missing in
_read_binary_value" in {
@@ -402,6 +418,85 @@ class HuggingFaceInferenceOpDescSpec extends AnyFlatSpec
with Matchers {
}
}
+ "audio task family" should
+ "route ASR and audio-classification through AudioTaskCodegen as raw binary
payloads" in {
+ val code =
+ makeDesc(task = "automatic-speech-recognition", inputAudioColumn =
"audio")
+ .generatePythonCode()
+ code should include("self.AUDIO_INPUT = ")
+ code should include("self.INPUT_AUDIO_COLUMN = ")
+ code should include(
+ """audio_only_tasks = ("automatic-speech-recognition",
"audio-classification")"""
+ )
+ code should include("payload = current_audio_bytes")
+ code should include("raw_binary_headers = audio_headers")
+ code should include("self._read_audio_input()")
+ code should include(
+ """"Content-Type": "application/octet-stream" if use_audio_column else
self._get_audio_content_type()"""
+ )
+ code should include(
+ """path = _urlparse(audio_input).path if audio_input.startswith("http")
else audio_input"""
+ )
+ code should include(
+ """audio_content_type = raw_binary_headers.get("Content-Type",
"audio/mpeg")"""
+ )
+ code should include(
+ """elif task in ("automatic-speech-recognition", "audio-classification")
and img_b64:"""
+ )
+ code should not include "data:audio/wav;base64"
+ code should include(
+ """if content_type.startswith("audio/") or
content_type.startswith("video/"):"""
+ )
+ }
+
+ it should "route text-to-speech through AudioTaskCodegen and normalize audio
URLs" in {
+ val code = makeDesc(task = "text-to-speech").generatePythonCode()
+ code should include("""elif task == "text-to-speech":""")
+ code should include("""payload = {"inputs": prompt_value}""")
+ code should include("self._url_to_data_url(")
+ code should include(""""text-to-speech": "audio/mpeg"""")
+ code should include("""".m4a": "audio/m4a"""")
+ code should not include "_audio_url_to_data_url"
+ code should include("data:audio/mpeg;base64")
+ }
+
+ it should "register all audio task strings under the dispatcher" in {
+ AudioTaskCodegen.tasks should contain allOf (
+ "automatic-speech-recognition",
+ "audio-classification",
+ "text-to-speech"
+ )
+ AudioTaskCodegen.tasks.foreach { t =>
+ val code = makeDesc(task = t, inputAudioColumn =
"audio").generatePythonCode()
+ code should include("if task in audio_only_tasks:")
+ }
+ }
+
+ "media generation task family" should
+ "route text-to-image through MediaGenCodegen and parse URL or b64
responses as data URLs" in {
+ val code = makeDesc(task = "text-to-image").generatePythonCode()
+ code should include("if task not in image_tasks and task not in
audio_only_tasks:")
+ code should include("""payload = {"inputs": prompt_value}""")
+ code should include("""if task == "text-to-image":""")
+ code should include("self._url_to_data_url(")
+ code should include("data:image/png;base64")
+ }
+
+ it should "route text-to-video through MediaGenCodegen and normalize remote
video URLs" in {
+ val code = makeDesc(task = "text-to-video").generatePythonCode()
+ code should include("""elif task == "text-to-video":""")
+ code should include("self._url_to_data_url(")
+ code should include("video/mp4")
+ }
+
+ it should "register all media generation task strings under the dispatcher"
in {
+ MediaGenCodegen.tasks should contain allOf ("text-to-image",
"text-to-video")
+ MediaGenCodegen.tasks.foreach { t =>
+ val code = makeDesc(task = t).generatePythonCode()
+ code should include("""payload = {"inputs": prompt_value}""")
+ }
+ }
+
"getOutputSchemas" should "add the result column as a STRING to the
inherited schema" in {
val desc = makeDesc(resultColumn = "answer")
val inputSchema = Schema().add("prompt", AttributeType.STRING)