This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5597-387a31693bf63ed50fdb481981485aac70cb9a65
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 84fe8179b7537eb1d562b294ac5f6c511e0fc65e
Author: yangzhang75 <[email protected]>
AuthorDate: Tue Jun 16 13:50:46 2026 -0700

    refactor(engine): pass Python worker startup arguments by name (#5597)
    
    <!--
    Thanks for sending a pull request (PR)! Here are some tips for you:
    1. If this is your first time, please read our contributor guidelines:
    [Contributing to
    Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md)
      2. Ensure you have added or run the appropriate tests for your PR
      3. If the PR is work in progress, mark it a draft on GitHub.
      4. Please write your PR title to summarize what this PR proposes, we
        are following Conventional Commits style for PR titles as well.
      5. Be sure to keep the PR description updated to reflect all changes.
    -->
    
    ### What changes were proposed in this PR?
    <!--
    Please clarify what changes you are proposing. The purpose of this
    section
    is to outline the changes. Here are some tips for you:
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
      3. If it is a refactoring, clarify what has been changed.
      3. It would be helpful to include a before-and-after comparison using
         screenshots or GIFs.
      4. Please consider writing useful notes for better and faster reviews.
    -->
    
    Passes Python worker startup configuration by name instead of by argv
    position, as proposed in the issue.
    
    PythonWorkflowWorker (JVM) previously built ~19 positional command-line
    arguments, and texera_run_python_worker.py unpacked them positionally.
    Because the two sides agreed only by index, adding/removing/reordering
    one argument could silently misassign values.
    
    - PythonWorkflowWorker.scala: build a single JSON object of named
    startup-config keys and pass it as one argument.
    - texera_run_python_worker.py: parse that JSON and read each value by
    key; a missing or renamed key now raises a clear KeyError instead of
    silently misaligning.
    
    The set of keys written on the Scala side and read on the Python side is
    identical (19 keys). No behavior change otherwise.
    
    ### Any related issues, documentation, discussions?
    <!--
    Please use this section to link other resources if not mentioned
    already.
    1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves
    #1234`
    or `Closes #1234`. If it is only related, simply mention the issue
    number.
      2. If there is design documentation, please add the link.
      3. If there is a discussion in the mailing list, please add the link.
    -->
    Closes #5547
    
    ### How was this PR tested?
    <!--
    If tests were added, say they were added here. Or simply mention that if
    the PR
    is tested with existing test cases. Make sure to include/update test
    cases that
    check the changes thoroughly including negative and positive cases if
    possible.
    If it was tested in a way different from regular unit tests, please
    clarify how
    you tested step by step, ideally copy and paste-able, so that other
    reviewers can
    test and check, and descendants can verify in the future. If tests were
    not added,
    please describe why they were not added and/or why it was difficult to
    add.
    -->
    - Verified the JSON key set written by PythonWorkflowWorker.scala
    exactly matches the keys read in texera_run_python_worker.py (19 keys,
    no drift).
    - `WorkflowExecutionService/compile` (amber) succeeds.
    - `scalafmtCheckAll` passes; scalafix rules (RemoveUnused,
    ProcedureSyntax) are satisfied (the new import is used).
    - Python entry script passes `py_compile`.
    - End-to-end worker launch is exercised by the existing CI integration
    jobs (amber-integration), which start a real Python worker.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    <!--
    If generative AI tooling has been used in the process of authoring this
    PR,
    please include the phrase: 'Generated-by: ' followed by the name of the
    tool
    and its version. If no, write 'No'.
    Please refer to the [ASF Generative Tooling
    Guidance](https://www.apache.org/legal/generative-tooling.html) for
    details.
    -->
    Generated-by: Claude Code (Claude Opus 4.8)
---
 amber/src/main/python/texera_run_python_worker.py  | 130 +++++++++++-----
 .../pythonworker/PythonWorkflowWorker.scala        |  91 +++++++++---
 amber/src/test/python/test_run_python_worker.py    | 165 +++++++++++++++++++++
 .../PythonWorkflowWorkerStartupConfigSpec.scala    |  91 ++++++++++++
 4 files changed, 415 insertions(+), 62 deletions(-)

diff --git a/amber/src/main/python/texera_run_python_worker.py 
b/amber/src/main/python/texera_run_python_worker.py
index 5c3e25e096..1cbec51a04 100644
--- a/amber/src/main/python/texera_run_python_worker.py
+++ b/amber/src/main/python/texera_run_python_worker.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import json
 import sys
 from loguru import logger
 
@@ -49,54 +50,103 @@ def init_loguru_logger(stream_log_level) -> None:
     logger.add(sys.stderr, level=stream_log_level)
 
 
-if __name__ == "__main__":
-    (
-        _,
-        worker_id,
-        output_port,
-        logger_level,
-        r_path,
-        iceberg_catalog_type,
-        iceberg_postgres_catalog_uri_without_scheme,
-        iceberg_postgres_catalog_username,
-        iceberg_postgres_catalog_password,
-        iceberg_rest_catalog_uri,
-        iceberg_rest_catalog_warehouse_name,
-        iceberg_table_namespace,
-        iceberg_table_state_namespace,
-        iceberg_file_storage_directory_path,
-        iceberg_table_commit_batch_size,
-        s3_endpoint,
-        s3_region,
-        s3_auth_username,
-        s3_auth_password,
-        s3_large_binaries_base_uri,
-    ) = sys.argv
-    init_loguru_logger(logger_level)
+# Keys the JVM side (PythonWorkflowWorker) sends in the startup-config JSON.
+# Declared here so any drift between the two sides fails loudly instead of 
being
+# silently misassigned, as could happen with the previous positional unpacking.
+EXPECTED_CONFIG_KEYS = frozenset(
+    {
+        "workerId",
+        "outputPort",
+        "loggerLevel",
+        "rPath",
+        "icebergCatalogType",
+        "icebergPostgresCatalogUriWithoutScheme",
+        "icebergPostgresCatalogUsername",
+        "icebergPostgresCatalogPassword",
+        "icebergRestCatalogUri",
+        "icebergRestCatalogWarehouseName",
+        "icebergTableNamespace",
+        "icebergTableStateNamespace",
+        "icebergFileStorageDirectoryPath",
+        "icebergTableCommitBatchSize",
+        "s3Endpoint",
+        "s3Region",
+        "s3AuthUsername",
+        "s3AuthPassword",
+        "s3LargeBinariesBaseUri",
+    }
+)
+
+
+def parse_startup_config(raw_config: str) -> dict:
+    """Parse and validate the JSON startup configuration.
+
+    The configuration is passed by name (see PythonWorkflowWorker on the JVM
+    side), so the two sides must agree on an exact key set. Key order is
+    irrelevant since it is a JSON object. Any drift fails loudly:
+      - a missing or unexpected key raises ValueError;
+      - a non-string value raises TypeError.
+    """
+    config = json.loads(raw_config)
+    if not isinstance(config, dict):
+        raise TypeError(
+            f"startup config must be a JSON object, got 
{type(config).__name__}"
+        )
+
+    actual_keys = set(config)
+    missing = EXPECTED_CONFIG_KEYS - actual_keys
+    unexpected = actual_keys - EXPECTED_CONFIG_KEYS
+    if missing or unexpected:
+        raise ValueError(
+            f"startup config key mismatch: missing={sorted(missing)}, "
+            f"unexpected={sorted(unexpected)}"
+        )
+
+    non_string_keys = sorted(k for k, v in config.items() if not isinstance(v, 
str))
+    if non_string_keys:
+        raise TypeError(
+            f"startup config values must be strings; non-string keys: 
{non_string_keys}"
+        )
+
+    return config
+
+
+def main(raw_config: str) -> None:
+    """Start a Python worker from its validated JSON startup configuration."""
+    config = parse_startup_config(raw_config)
+
+    init_loguru_logger(config["loggerLevel"])
     StorageConfig.initialize(
-        iceberg_catalog_type,
-        iceberg_postgres_catalog_uri_without_scheme,
-        iceberg_postgres_catalog_username,
-        iceberg_postgres_catalog_password,
-        iceberg_rest_catalog_uri,
-        iceberg_rest_catalog_warehouse_name,
-        iceberg_table_namespace,
-        iceberg_table_state_namespace,
-        iceberg_file_storage_directory_path,
-        iceberg_table_commit_batch_size,
-        s3_endpoint,
-        s3_region,
-        s3_auth_username,
-        s3_auth_password,
-        s3_large_binaries_base_uri,
+        config["icebergCatalogType"],
+        config["icebergPostgresCatalogUriWithoutScheme"],
+        config["icebergPostgresCatalogUsername"],
+        config["icebergPostgresCatalogPassword"],
+        config["icebergRestCatalogUri"],
+        config["icebergRestCatalogWarehouseName"],
+        config["icebergTableNamespace"],
+        config["icebergTableStateNamespace"],
+        config["icebergFileStorageDirectoryPath"],
+        config["icebergTableCommitBatchSize"],
+        config["s3Endpoint"],
+        config["s3Region"],
+        config["s3AuthUsername"],
+        config["s3AuthPassword"],
+        config["s3LargeBinariesBaseUri"],
     )
 
     # Setting R_HOME environment variable for R-UDF usage
+    r_path = config["rPath"]
     if r_path:
         import os
 
         os.environ["R_HOME"] = r_path
 
     PythonWorker(
-        worker_id=worker_id, host="localhost", output_port=int(output_port)
+        worker_id=config["workerId"],
+        host="localhost",
+        output_port=int(config["outputPort"]),
     ).run()
+
+
+if __name__ == "__main__":
+    main(sys.argv[1])
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
index bd9f052b06..3639ce96a6 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala
@@ -39,6 +39,7 @@ import 
org.apache.texera.amber.engine.common.actormessage.{Backpressure, CreditU
 import 
org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize
 import org.apache.texera.amber.engine.common.ambermessage._
 import org.apache.texera.amber.engine.common.{CheckpointState, Utils}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
 
 import java.nio.file.Path
 import org.apache.texera.web.resource.pythonvirtualenvironment.PveManager
@@ -47,6 +48,62 @@ import scala.sys.process.{BasicIO, Process}
 
 object PythonWorkflowWorker {
   def props(workerConfig: WorkerConfig): Props = Props(new 
PythonWorkflowWorker(workerConfig))
+
+  /**
+    * Serialize the Python worker startup configuration to a JSON object, 
keyed by
+    * name. Built from a sequence of (key, value) pairs so a duplicate key 
fails
+    * loudly here instead of being silently dropped by Map construction.
+    */
+  def encodeStartupConfig(entries: Seq[(String, String)]): String = {
+    val duplicateKeys = entries.groupBy(_._1).collect { case (key, group) if 
group.size > 1 => key }
+    require(
+      duplicateKeys.isEmpty,
+      s"duplicate Python worker startup config keys: 
${duplicateKeys.mkString(", ")}"
+    )
+    objectMapper.writeValueAsString(entries.toMap)
+  }
+
+  /**
+    * Assemble the Python worker startup configuration as named (key, value) 
pairs.
+    * Worker-specific values are passed in; storage-related values are read 
from the
+    * shared StorageConfig (Postgres/REST catalog fields are blank unless that 
catalog
+    * type is active). Returned as a sequence (not a Map) so 
encodeStartupConfig can
+    * detect a duplicate key.
+    */
+  def buildStartupConfig(
+      workerId: String,
+      outputPort: String,
+      rPath: String,
+      largeBinaryBaseUri: String
+  ): Seq[(String, String)] = {
+    val isPostgres = StorageConfig.icebergCatalogType == "postgres"
+    val isRest = StorageConfig.icebergCatalogType == "rest"
+    Seq(
+      "workerId" -> workerId,
+      "outputPort" -> outputPort,
+      "loggerLevel" -> UdfConfig.pythonLogStreamHandlerLevel,
+      "rPath" -> rPath,
+      "icebergCatalogType" -> StorageConfig.icebergCatalogType,
+      "icebergPostgresCatalogUriWithoutScheme" ->
+        (if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme 
else ""),
+      "icebergPostgresCatalogUsername" ->
+        (if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else ""),
+      "icebergPostgresCatalogPassword" ->
+        (if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else ""),
+      "icebergRestCatalogUri" -> (if (isRest) 
StorageConfig.icebergRESTCatalogUri else ""),
+      "icebergRestCatalogWarehouseName" ->
+        (if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else ""),
+      "icebergTableNamespace" -> StorageConfig.icebergTableResultNamespace,
+      "icebergTableStateNamespace" -> StorageConfig.icebergTableStateNamespace,
+      "icebergFileStorageDirectoryPath" -> 
StorageConfig.fileStorageDirectoryPath.toString,
+      "icebergTableCommitBatchSize" -> 
StorageConfig.icebergTableCommitBatchSize.toString,
+      "s3Endpoint" -> StorageConfig.s3Endpoint,
+      "s3Region" -> StorageConfig.s3Region,
+      "s3AuthUsername" -> StorageConfig.s3Username,
+      "s3AuthPassword" -> StorageConfig.s3Password,
+      "s3LargeBinariesBaseUri" -> largeBinaryBaseUri
+    )
+  }
 }
 
 class PythonWorkflowWorker(
@@ -184,33 +241,23 @@ class PythonWorkflowWorker(
 
     val pythonBin: String = choosePythonBin()
 
-    // Set the Iceberg related arguments based on the catalog type.
-    val isPostgres = StorageConfig.icebergCatalogType == "postgres"
-    val isRest = StorageConfig.icebergCatalogType == "rest"
+    // Pass startup configuration to the Python worker by name, as a single 
JSON
+    // object, rather than by argv position. This way the two sides agree by 
key,
+    // so adding/removing/reordering a field can no longer silently misassign
+    // values; a missing or renamed key fails loudly on the Python side 
instead.
+    val startupConfig = PythonWorkflowWorker.buildStartupConfig(
+      workerConfig.workerId.name,
+      Integer.toString(pythonProxyServer.getPortNumber.get()),
+      RENVPath,
+      workerConfig.largeBinaryBaseUri
+    )
+
     pythonServerProcess = Process(
       Seq(
         pythonBin,
         "-u",
         udfEntryScriptPath,
-        workerConfig.workerId.name,
-        Integer.toString(pythonProxyServer.getPortNumber.get()),
-        UdfConfig.pythonLogStreamHandlerLevel,
-        RENVPath,
-        StorageConfig.icebergCatalogType,
-        if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme 
else "",
-        if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else "",
-        if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else "",
-        if (isRest) StorageConfig.icebergRESTCatalogUri else "",
-        if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "",
-        StorageConfig.icebergTableResultNamespace,
-        StorageConfig.icebergTableStateNamespace,
-        StorageConfig.fileStorageDirectoryPath.toString,
-        StorageConfig.icebergTableCommitBatchSize.toString,
-        StorageConfig.s3Endpoint,
-        StorageConfig.s3Region,
-        StorageConfig.s3Username,
-        StorageConfig.s3Password,
-        workerConfig.largeBinaryBaseUri
+        PythonWorkflowWorker.encodeStartupConfig(startupConfig)
       )
     ).run(BasicIO.standard(false))
   }
diff --git a/amber/src/test/python/test_run_python_worker.py 
b/amber/src/test/python/test_run_python_worker.py
new file mode 100644
index 0000000000..3cc41bc9e2
--- /dev/null
+++ b/amber/src/test/python/test_run_python_worker.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+from unittest import mock
+
+import pytest
+
+import texera_run_python_worker as entry
+
+
+def _full_config() -> dict:
+    """A complete startup config matching the keys PythonWorkflowWorker 
emits."""
+    return {
+        "workerId": "worker-1",
+        "outputPort": "5005",
+        "loggerLevel": "INFO",
+        "rPath": "",
+        "icebergCatalogType": "postgres",
+        "icebergPostgresCatalogUriWithoutScheme": "host:5432/db",
+        "icebergPostgresCatalogUsername": "pg-user",
+        "icebergPostgresCatalogPassword": "pg-pass",
+        "icebergRestCatalogUri": "",
+        "icebergRestCatalogWarehouseName": "",
+        "icebergTableNamespace": "result_ns",
+        "icebergTableStateNamespace": "state_ns",
+        "icebergFileStorageDirectoryPath": "/tmp/files",
+        "icebergTableCommitBatchSize": "100",
+        "s3Endpoint": "http://s3:9000";,
+        "s3Region": "us-west-2",
+        "s3AuthUsername": "s3-user",
+        "s3AuthPassword": "s3-pass",
+        "s3LargeBinariesBaseUri": "s3://bucket/base",
+    }
+
+
+def _patched_collaborators():
+    """Patch the heavy collaborators so main() exercises only the config 
wiring."""
+    return (
+        mock.patch.object(entry, "StorageConfig"),
+        mock.patch.object(entry, "PythonWorker"),
+        mock.patch.object(entry, "init_loguru_logger"),
+    )
+
+
+def test_full_config_keys_match_the_expected_set():
+    # Guards against the sample config in this test drifting from the contract.
+    assert set(_full_config()) == set(entry.EXPECTED_CONFIG_KEYS)
+
+
+def test_main_maps_named_config_to_storage_and_worker():
+    """Each named field reaches the correct StorageConfig.initialize argument 
and
+    worker parameter — guarding against the silent misalignment that positional
+    argv passing allowed."""
+    config = _full_config()
+    storage_patch, worker_patch, _logger_patch = _patched_collaborators()
+    with storage_patch as storage_config, worker_patch as python_worker, 
_logger_patch:
+        entry.main(json.dumps(config))
+
+    storage_config.initialize.assert_called_once_with(
+        "postgres",
+        "host:5432/db",
+        "pg-user",
+        "pg-pass",
+        "",
+        "",
+        "result_ns",
+        "state_ns",
+        "/tmp/files",
+        "100",
+        "http://s3:9000";,
+        "us-west-2",
+        "s3-user",
+        "s3-pass",
+        "s3://bucket/base",
+    )
+    python_worker.assert_called_once_with(
+        worker_id="worker-1", host="localhost", output_port=5005
+    )
+    python_worker.return_value.run.assert_called_once()
+
+
+def test_main_mapping_is_independent_of_key_order():
+    """Reordering the JSON keys must not change where values land (it is a 
dict)."""
+    reordered = dict(reversed(list(_full_config().items())))
+    storage_patch, worker_patch, _logger_patch = _patched_collaborators()
+    with storage_patch as storage_config, worker_patch as python_worker, 
_logger_patch:
+        entry.main(json.dumps(reordered))
+
+    storage_config.initialize.assert_called_once_with(
+        "postgres",
+        "host:5432/db",
+        "pg-user",
+        "pg-pass",
+        "",
+        "",
+        "result_ns",
+        "state_ns",
+        "/tmp/files",
+        "100",
+        "http://s3:9000";,
+        "us-west-2",
+        "s3-user",
+        "s3-pass",
+        "s3://bucket/base",
+    )
+    python_worker.assert_called_once_with(
+        worker_id="worker-1", host="localhost", output_port=5005
+    )
+
+
+def test_main_sets_r_home_when_r_path_present(monkeypatch):
+    monkeypatch.delenv("R_HOME", raising=False)
+    config = _full_config()
+    config["rPath"] = "/opt/R"
+    storage_patch, worker_patch, _logger_patch = _patched_collaborators()
+    with storage_patch, worker_patch, _logger_patch:
+        import os
+
+        entry.main(json.dumps(config))
+        assert os.environ["R_HOME"] == "/opt/R"
+
+
[email protected]("missing_key", sorted(_full_config().keys()))
+def test_parse_rejects_a_missing_key(missing_key):
+    """A missing key fails loudly rather than being silently misassigned."""
+    config = _full_config()
+    del config[missing_key]
+    with pytest.raises(ValueError, match="key mismatch"):
+        entry.parse_startup_config(json.dumps(config))
+
+
+def test_parse_rejects_an_unexpected_key():
+    """An extra key (e.g. the JVM side added a field) fails instead of being 
ignored."""
+    config = _full_config()
+    config["someNewField"] = "value"
+    with pytest.raises(ValueError, match="key mismatch"):
+        entry.parse_startup_config(json.dumps(config))
+
+
+def test_parse_rejects_a_non_string_value():
+    """A wrongly-typed value (e.g. a number instead of a string) fails."""
+    config = _full_config()
+    config["outputPort"] = 5005  # number instead of the expected string
+    with pytest.raises(TypeError, match="must be strings"):
+        entry.parse_startup_config(json.dumps(config))
+
+
+def test_parse_rejects_a_non_object_payload():
+    with pytest.raises(TypeError, match="must be a JSON object"):
+        entry.parse_startup_config(json.dumps(["not", "an", "object"]))
diff --git 
a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala
 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala
new file mode 100644
index 0000000000..26f6cb937c
--- /dev/null
+++ 
b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.architecture.pythonworker
+
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+import org.scalatest.flatspec.AnyFlatSpec
+
+class PythonWorkflowWorkerStartupConfigSpec extends AnyFlatSpec {
+
+  "encodeStartupConfig" should "serialize entries to a JSON object keyed by 
name" in {
+    val json = PythonWorkflowWorker.encodeStartupConfig(
+      Seq("workerId" -> "w-1", "outputPort" -> "5005", "s3Region" -> 
"us-west-2")
+    )
+    val parsed = objectMapper.readValue(json, classOf[java.util.Map[String, 
String]])
+    assert(parsed.get("workerId") == "w-1")
+    assert(parsed.get("outputPort") == "5005")
+    assert(parsed.get("s3Region") == "us-west-2")
+    assert(parsed.size() == 3)
+  }
+
+  it should "fail loudly when the same key appears more than once" in {
+    val exception = intercept[IllegalArgumentException] {
+      PythonWorkflowWorker.encodeStartupConfig(
+        Seq("s3Region" -> "us-west-2", "s3Region" -> "us-east-1")
+      )
+    }
+    assert(exception.getMessage.contains("duplicate"))
+  }
+
+  private val expectedKeys = Set(
+    "workerId",
+    "outputPort",
+    "loggerLevel",
+    "rPath",
+    "icebergCatalogType",
+    "icebergPostgresCatalogUriWithoutScheme",
+    "icebergPostgresCatalogUsername",
+    "icebergPostgresCatalogPassword",
+    "icebergRestCatalogUri",
+    "icebergRestCatalogWarehouseName",
+    "icebergTableNamespace",
+    "icebergTableStateNamespace",
+    "icebergFileStorageDirectoryPath",
+    "icebergTableCommitBatchSize",
+    "s3Endpoint",
+    "s3Region",
+    "s3AuthUsername",
+    "s3AuthPassword",
+    "s3LargeBinariesBaseUri"
+  )
+
+  "buildStartupConfig" should "produce exactly the expected named keys with 
the worker values" in {
+    val config =
+      PythonWorkflowWorker.buildStartupConfig("worker-7", "6000", "/opt/R", 
"s3://bucket/uri")
+    val map = config.toMap
+
+    assert(config.size == expectedKeys.size, "no duplicate or missing keys")
+    assert(map.keySet == expectedKeys)
+    assert(map("workerId") == "worker-7")
+    assert(map("outputPort") == "6000")
+    assert(map("rPath") == "/opt/R")
+    assert(map("s3LargeBinariesBaseUri") == "s3://bucket/uri")
+  }
+
+  it should "produce a config that round-trips through encodeStartupConfig" in 
{
+    val json = PythonWorkflowWorker.encodeStartupConfig(
+      PythonWorkflowWorker.buildStartupConfig("w", "1", "", "uri")
+    )
+    val parsed = objectMapper.readValue(json, classOf[java.util.Map[String, 
String]])
+    assert(parsed.get("workerId") == "w")
+    assert(parsed.get("s3LargeBinariesBaseUri") == "uri")
+    assert(parsed.size() == expectedKeys.size)
+  }
+}

Reply via email to