This is an automated email from the ASF dual-hosted git repository.
mengw15 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new b5a18a0f28 fix: pass different environment variables to computing unit
and Python worker based on the catalog type (#4478)
b5a18a0f28 is described below
commit b5a18a0f28e899c9528fb874357d092ea3cf2c61
Author: Meng Wang <[email protected]>
AuthorDate: Thu Apr 23 16:18:44 2026 -0700
fix: pass different environment variables to computing unit and Python
worker based on the catalog type (#4478)
### What changes were proposed in this PR?
When the active Iceberg catalog is rest, only REST credentials are
forwarded across process
boundaries; when it is postgres, only Postgres credentials are
forwarded. Previously both backends'
credentials were forwarded unconditionally, exposing dead-weight
credentials to consumers that never
use them.
Two transmission paths are gated:
1. K8s CU pod env vars — ComputingUnitManagingResource.scala: extract
icebergEnvironmentVariables def
that branches on StorageConfig.icebergCatalogType and only emits the
active backend's env keys
(STORAGE_ICEBERG_CATALOG_REST_* for rest;
STORAGE_ICEBERG_CATALOG_POSTGRES_* for postgres).
2. Python UDF subprocess argv — PythonWorkflowWorker.scala: the 6
iceberg credential slots passed to
texera_run_python_worker.py are gated by catalog type. The inactive
backend's slots are passed as
empty strings to keep the positional-arg layout stable, so the Python
entrypoint needs no change (it
already only consumes the active backend's fields via
iceberg_catalog_instance.py).
### Any related issues, documentation, discussions?
Closes #4477
### How was this PR tested?
- sbt ComputingUnitManagingService/compile
WorkflowExecutionService/compile passes.
- Manual verification
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
---------
Co-authored-by: Chen Li <[email protected]>
---
.../pythonworker/PythonWorkflowWorker.scala | 13 +--
.../resource/ComputingUnitManagingResource.scala | 98 ++++++++++++----------
2 files changed, 64 insertions(+), 47 deletions(-)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index d2bc5f5025..32e417f3c0 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -168,6 +168,9 @@ class PythonWorkflowWorker(
private def startPythonProcess(): Unit = {
val udfEntryScriptPath: String =
pythonSrcDirectory.resolve("texera_run_python_worker.py").toString
+ // Set the Iceberg related arguments based on the catalog type.
+ val isPostgres = StorageConfig.icebergCatalogType == "postgres"
+ val isRest = StorageConfig.icebergCatalogType == "rest"
pythonServerProcess = Process(
Seq(
if (pythonENVPath.isEmpty) "python3"
@@ -179,11 +182,11 @@ class PythonWorkflowWorker(
UdfConfig.pythonLogStreamHandlerLevel,
RENVPath,
StorageConfig.icebergCatalogType,
- StorageConfig.icebergPostgresCatalogUriWithoutScheme,
- StorageConfig.icebergPostgresCatalogUsername,
- StorageConfig.icebergPostgresCatalogPassword,
- StorageConfig.icebergRESTCatalogUri,
- StorageConfig.icebergRESTCatalogWarehouseName,
+ if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme
else "",
+ if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else "",
+ if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else "",
+ if (isRest) StorageConfig.icebergRESTCatalogUri else "",
+ if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
StorageConfig.icebergTableResultNamespace,
StorageConfig.fileStorageDirectoryPath.toString,
StorageConfig.icebergTableCommitBatchSize.toString,
diff --git
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
index bc4aa32ef2..2d1b312b0e 100644
---
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
+++
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
@@ -65,49 +65,63 @@ object ComputingUnitManagingResource {
.getInstance()
.createDSLContext()
+ private def icebergEnvironmentVariables: Map[String, Any] = {
+ val base = Map[String, Any](
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE ->
StorageConfig.icebergCatalogType
+ )
+ StorageConfig.icebergCatalogType match {
+ case "rest" =>
+ base ++ Map(
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI ->
StorageConfig.icebergRESTCatalogUri,
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME ->
StorageConfig.icebergRESTCatalogWarehouseName
+ )
+ case "postgres" =>
+ base ++ Map(
+
EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME ->
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME ->
StorageConfig.icebergPostgresCatalogUsername,
+ EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD ->
StorageConfig.icebergPostgresCatalogPassword
+ )
+ case _ => base
+ }
+ }
+
// Environment variables passed to the created computing unit(pod)
- private lazy val computingUnitEnvironmentVariables: Map[String, Any] = Map(
- // Variables for saving results to Iceberg
- EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE ->
StorageConfig.icebergCatalogType,
- EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI ->
StorageConfig.icebergRESTCatalogUri,
- EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME ->
StorageConfig.icebergRESTCatalogWarehouseName,
- EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME ->
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
- EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME ->
StorageConfig.icebergPostgresCatalogUsername,
- EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD ->
StorageConfig.icebergPostgresCatalogPassword,
- // Variables for saving the metadata of the results, i.e. URIs of
results/stats
- EnvironmentalVariable.ENV_JDBC_URL -> StorageConfig.jdbcUrl,
- EnvironmentalVariable.ENV_JDBC_USERNAME -> StorageConfig.jdbcUsername,
- EnvironmentalVariable.ENV_JDBC_PASSWORD -> StorageConfig.jdbcPassword,
- // Variables for reading files & exporting results
- // LakeFS endpoint is passed to CU to make CU work in dev mode(using
localhost & using default LakeFS credentials)
- // LakeFS credentials should NOT be passed to CU
- EnvironmentalVariable.ENV_LAKEFS_ENDPOINT -> StorageConfig.lakefsEndpoint,
- // S3 variables are passed to CU for R UDF large binary support
- EnvironmentalVariable.ENV_S3_ENDPOINT -> StorageConfig.s3Endpoint,
- EnvironmentalVariable.ENV_S3_REGION -> StorageConfig.s3Region,
- EnvironmentalVariable.ENV_S3_AUTH_USERNAME -> StorageConfig.s3Username,
- EnvironmentalVariable.ENV_S3_AUTH_PASSWORD -> StorageConfig.s3Password,
- EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT ->
EnvironmentalVariable
- .get(EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT)
- .get,
- EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT
-> EnvironmentalVariable
-
.get(EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT)
- .get,
- // Variables for amber setting
- // TODO: use AmberConfig for the following items. Currently AmberConfig is
only accessible in workflow-executing-service
-
EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR
-> EnvironmentalVariable
-
.get(EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR)
- .get,
- EnvironmentalVariable.ENV_USER_SYS_ENABLED -> EnvironmentalVariable
- .get(EnvironmentalVariable.ENV_USER_SYS_ENABLED)
- .get,
- EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB
-> EnvironmentalVariable
-
.get(EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB)
- .get,
- EnvironmentalVariable.ENV_AUTH_JWT_SECRET -> EnvironmentalVariable
- .get(EnvironmentalVariable.ENV_AUTH_JWT_SECRET)
- .get
- )
+ private lazy val computingUnitEnvironmentVariables: Map[String, Any] =
+ icebergEnvironmentVariables ++ Map(
+ // Variables for saving the metadata of the results, i.e. URIs of
results/stats
+ EnvironmentalVariable.ENV_JDBC_URL -> StorageConfig.jdbcUrl,
+ EnvironmentalVariable.ENV_JDBC_USERNAME -> StorageConfig.jdbcUsername,
+ EnvironmentalVariable.ENV_JDBC_PASSWORD -> StorageConfig.jdbcPassword,
+ // Variables for reading files & exporting results
+ // LakeFS endpoint is passed to CU to make CU work in dev mode(using
localhost & using default LakeFS credentials)
+ // LakeFS credentials should NOT be passed to CU
+ EnvironmentalVariable.ENV_LAKEFS_ENDPOINT ->
StorageConfig.lakefsEndpoint,
+ // S3 variables are passed to CU for R UDF large binary support
+ EnvironmentalVariable.ENV_S3_ENDPOINT -> StorageConfig.s3Endpoint,
+ EnvironmentalVariable.ENV_S3_REGION -> StorageConfig.s3Region,
+ EnvironmentalVariable.ENV_S3_AUTH_USERNAME -> StorageConfig.s3Username,
+ EnvironmentalVariable.ENV_S3_AUTH_PASSWORD -> StorageConfig.s3Password,
+ EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT ->
EnvironmentalVariable
+ .get(EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT)
+ .get,
+
EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT ->
EnvironmentalVariable
+
.get(EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT)
+ .get,
+ // Variables for amber setting
+ // TODO: use AmberConfig for the following items. Currently AmberConfig
is only accessible in workflow-executing-service
+
EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR
-> EnvironmentalVariable
+
.get(EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR)
+ .get,
+ EnvironmentalVariable.ENV_USER_SYS_ENABLED -> EnvironmentalVariable
+ .get(EnvironmentalVariable.ENV_USER_SYS_ENABLED)
+ .get,
+ EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB
-> EnvironmentalVariable
+
.get(EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB)
+ .get,
+ EnvironmentalVariable.ENV_AUTH_JWT_SECRET -> EnvironmentalVariable
+ .get(EnvironmentalVariable.ENV_AUTH_JWT_SECRET)
+ .get
+ )
case class WorkflowComputingUnitCreationParams(
name: String,