This is an automated email from the ASF dual-hosted git repository.

mengw15 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new b5a18a0f28 fix: pass different environment variables to computing unit 
and Python worker based on the catalog type (#4478)
b5a18a0f28 is described below

commit b5a18a0f28e899c9528fb874357d092ea3cf2c61
Author: Meng Wang <[email protected]>
AuthorDate: Thu Apr 23 16:18:44 2026 -0700

    fix: pass different environment variables to computing unit and Python 
worker based on the catalog type (#4478)
    
    ### What changes were proposed in this PR?
    
    When the active Iceberg catalog is rest, only REST credentials are
    forwarded across process
    boundaries; when it is postgres, only Postgres credentials are
    forwarded. Previously both backends'
    credentials were forwarded unconditionally, exposing dead-weight
    credentials to consumers that never
    use them.
    
    Two transmission paths are gated:
    
    1. K8s CU pod env vars — ComputingUnitManagingResource.scala: extract
    icebergEnvironmentVariables def
    that branches on StorageConfig.icebergCatalogType and only emits the
    active backend's env keys
    (STORAGE_ICEBERG_CATALOG_REST_* for rest;
    STORAGE_ICEBERG_CATALOG_POSTGRES_* for postgres).
    2. Python UDF subprocess argv — PythonWorkflowWorker.scala: the 6
    iceberg credential slots passed to
    texera_run_python_worker.py are gated by catalog type. The inactive
    backend's slots are passed as
    empty strings to keep the positional-arg layout stable, so the Python
    entrypoint needs no change (it
    already only consumes the active backend's fields via
    iceberg_catalog_instance.py).
     ### Any related issues, documentation, discussions?
      Closes #4477
    
     ### How was this PR tested?
    
    - sbt ComputingUnitManagingService/compile
    WorkflowExecutionService/compile passes.
      - Manual verification
    
     ### Was this PR authored or co-authored using generative AI tooling?
      Generated-by: Claude Code (Claude Opus 4.7)
    
    ---------
    
    Co-authored-by: Chen Li <[email protected]>
---
 .../pythonworker/PythonWorkflowWorker.scala        | 13 +--
 .../resource/ComputingUnitManagingResource.scala   | 98 ++++++++++++----------
 2 files changed, 64 insertions(+), 47 deletions(-)

diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index d2bc5f5025..32e417f3c0 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -168,6 +168,9 @@ class PythonWorkflowWorker(
   private def startPythonProcess(): Unit = {
     val udfEntryScriptPath: String =
       pythonSrcDirectory.resolve("texera_run_python_worker.py").toString
+    // Set the Iceberg related arguments based on the catalog type.
+    val isPostgres = StorageConfig.icebergCatalogType == "postgres"
+    val isRest = StorageConfig.icebergCatalogType == "rest"
     pythonServerProcess = Process(
       Seq(
         if (pythonENVPath.isEmpty) "python3"
@@ -179,11 +182,11 @@ class PythonWorkflowWorker(
         UdfConfig.pythonLogStreamHandlerLevel,
         RENVPath,
         StorageConfig.icebergCatalogType,
-        StorageConfig.icebergPostgresCatalogUriWithoutScheme,
-        StorageConfig.icebergPostgresCatalogUsername,
-        StorageConfig.icebergPostgresCatalogPassword,
-        StorageConfig.icebergRESTCatalogUri,
-        StorageConfig.icebergRESTCatalogWarehouseName,
+        if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme 
else "",
+        if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else "",
+        if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else "",
+        if (isRest) StorageConfig.icebergRESTCatalogUri else "",
+        if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
         StorageConfig.icebergTableResultNamespace,
         StorageConfig.fileStorageDirectoryPath.toString,
         StorageConfig.icebergTableCommitBatchSize.toString,
diff --git 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
index bc4aa32ef2..2d1b312b0e 100644
--- 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
+++ 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala
@@ -65,49 +65,63 @@ object ComputingUnitManagingResource {
       .getInstance()
       .createDSLContext()
 
+  private def icebergEnvironmentVariables: Map[String, Any] = {
+    val base = Map[String, Any](
+      EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> 
StorageConfig.icebergCatalogType
+    )
+    StorageConfig.icebergCatalogType match {
+      case "rest" =>
+        base ++ Map(
+          EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI -> 
StorageConfig.icebergRESTCatalogUri,
+          EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME -> 
StorageConfig.icebergRESTCatalogWarehouseName
+        )
+      case "postgres" =>
+        base ++ Map(
+          
EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME -> 
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
+          EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME -> 
StorageConfig.icebergPostgresCatalogUsername,
+          EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD -> 
StorageConfig.icebergPostgresCatalogPassword
+        )
+      case _ => base
+    }
+  }
+
   // Environment variables passed to the created computing unit(pod)
-  private lazy val computingUnitEnvironmentVariables: Map[String, Any] = Map(
-    // Variables for saving results to Iceberg
-    EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> 
StorageConfig.icebergCatalogType,
-    EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_URI -> 
StorageConfig.icebergRESTCatalogUri,
-    EnvironmentalVariable.ENV_ICEBERG_CATALOG_REST_WAREHOUSE_NAME -> 
StorageConfig.icebergRESTCatalogWarehouseName,
-    EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_URI_WITHOUT_SCHEME -> 
StorageConfig.icebergPostgresCatalogUriWithoutScheme,
-    EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_USERNAME -> 
StorageConfig.icebergPostgresCatalogUsername,
-    EnvironmentalVariable.ENV_ICEBERG_CATALOG_POSTGRES_PASSWORD -> 
StorageConfig.icebergPostgresCatalogPassword,
-    // Variables for saving the metadata of the results, i.e. URIs of 
results/stats
-    EnvironmentalVariable.ENV_JDBC_URL -> StorageConfig.jdbcUrl,
-    EnvironmentalVariable.ENV_JDBC_USERNAME -> StorageConfig.jdbcUsername,
-    EnvironmentalVariable.ENV_JDBC_PASSWORD -> StorageConfig.jdbcPassword,
-    // Variables for reading files & exporting results
-    // LakeFS endpoint is passed to CU to make CU work in dev mode(using 
localhost & using default LakeFS credentials)
-    // LakeFS credentials should NOT be passed to CU
-    EnvironmentalVariable.ENV_LAKEFS_ENDPOINT -> StorageConfig.lakefsEndpoint,
-    // S3 variables are passed to CU for R UDF large binary support
-    EnvironmentalVariable.ENV_S3_ENDPOINT -> StorageConfig.s3Endpoint,
-    EnvironmentalVariable.ENV_S3_REGION -> StorageConfig.s3Region,
-    EnvironmentalVariable.ENV_S3_AUTH_USERNAME -> StorageConfig.s3Username,
-    EnvironmentalVariable.ENV_S3_AUTH_PASSWORD -> StorageConfig.s3Password,
-    EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT -> 
EnvironmentalVariable
-      .get(EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT)
-      .get,
-    EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT 
-> EnvironmentalVariable
-      
.get(EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT)
-      .get,
-    // Variables for amber setting
-    // TODO: use AmberConfig for the following items. Currently AmberConfig is 
only accessible in workflow-executing-service
-    
EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR
 -> EnvironmentalVariable
-      
.get(EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR)
-      .get,
-    EnvironmentalVariable.ENV_USER_SYS_ENABLED -> EnvironmentalVariable
-      .get(EnvironmentalVariable.ENV_USER_SYS_ENABLED)
-      .get,
-    EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB 
-> EnvironmentalVariable
-      
.get(EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB)
-      .get,
-    EnvironmentalVariable.ENV_AUTH_JWT_SECRET -> EnvironmentalVariable
-      .get(EnvironmentalVariable.ENV_AUTH_JWT_SECRET)
-      .get
-  )
+  private lazy val computingUnitEnvironmentVariables: Map[String, Any] =
+    icebergEnvironmentVariables ++ Map(
+      // Variables for saving the metadata of the results, i.e. URIs of 
results/stats
+      EnvironmentalVariable.ENV_JDBC_URL -> StorageConfig.jdbcUrl,
+      EnvironmentalVariable.ENV_JDBC_USERNAME -> StorageConfig.jdbcUsername,
+      EnvironmentalVariable.ENV_JDBC_PASSWORD -> StorageConfig.jdbcPassword,
+      // Variables for reading files & exporting results
+      // LakeFS endpoint is passed to CU to make CU work in dev mode(using 
localhost & using default LakeFS credentials)
+      // LakeFS credentials should NOT be passed to CU
+      EnvironmentalVariable.ENV_LAKEFS_ENDPOINT -> 
StorageConfig.lakefsEndpoint,
+      // S3 variables are passed to CU for R UDF large binary support
+      EnvironmentalVariable.ENV_S3_ENDPOINT -> StorageConfig.s3Endpoint,
+      EnvironmentalVariable.ENV_S3_REGION -> StorageConfig.s3Region,
+      EnvironmentalVariable.ENV_S3_AUTH_USERNAME -> StorageConfig.s3Username,
+      EnvironmentalVariable.ENV_S3_AUTH_PASSWORD -> StorageConfig.s3Password,
+      EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT -> 
EnvironmentalVariable
+        .get(EnvironmentalVariable.ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT)
+        .get,
+      
EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT -> 
EnvironmentalVariable
+        
.get(EnvironmentalVariable.ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT)
+        .get,
+      // Variables for amber setting
+      // TODO: use AmberConfig for the following items. Currently AmberConfig 
is only accessible in workflow-executing-service
+      
EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR
 -> EnvironmentalVariable
+        
.get(EnvironmentalVariable.ENV_SCHEDULE_GENERATOR_ENABLE_COST_BASED_SCHEDULE_GENERATOR)
+        .get,
+      EnvironmentalVariable.ENV_USER_SYS_ENABLED -> EnvironmentalVariable
+        .get(EnvironmentalVariable.ENV_USER_SYS_ENABLED)
+        .get,
+      EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB 
-> EnvironmentalVariable
+        
.get(EnvironmentalVariable.ENV_MAX_WORKFLOW_WEBSOCKET_REQUEST_PAYLOAD_SIZE_KB)
+        .get,
+      EnvironmentalVariable.ENV_AUTH_JWT_SECRET -> EnvironmentalVariable
+        .get(EnvironmentalVariable.ENV_AUTH_JWT_SECRET)
+        .get
+    )
 
   case class WorkflowComputingUnitCreationParams(
       name: String,

Reply via email to