This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5597-891d2adbc8d096774121150eb079dfacdfded161 in repository https://gitbox.apache.org/repos/asf/texera.git
commit 249d115bd8e1df957466fa0ccb2e3a7a133989cd Author: yangzhang75 <[email protected]> AuthorDate: Sun Jun 14 23:51:55 2026 -0700 refactor(engine): pass Python worker startup arguments by name (#5597) <!-- Thanks for sending a pull request (PR)! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: [Contributing to Texera](https://github.com/apache/texera/blob/main/CONTRIBUTING.md) 2. Ensure you have added or run the appropriate tests for your PR 3. If the PR is work in progress, mark it a draft on GitHub. 4. Please write your PR title to summarize what this PR proposes, we are following Conventional Commits style for PR titles as well. 5. Be sure to keep the PR description updated to reflect all changes. --> ### What changes were proposed in this PR? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes. Here are some tips for you: 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. 3. If it is a refactoring, clarify what has been changed. 3. It would be helpful to include a before-and-after comparison using screenshots or GIFs. 4. Please consider writing useful notes for better and faster reviews. --> Passes Python worker startup configuration by name instead of by argv position, as proposed in the issue. PythonWorkflowWorker (JVM) previously built ~19 positional command-line arguments, and texera_run_python_worker.py unpacked them positionally. Because the two sides agreed only by index, adding/removing/reordering one argument could silently misassign values. - PythonWorkflowWorker.scala: build a single JSON object of named startup-config keys and pass it as one argument. - texera_run_python_worker.py: parse that JSON and read each value by key; a missing or renamed key now raises a clear KeyError instead of silently misaligning. The set of keys written on the Scala side and read on the Python side is identical (19 keys). No behavior change otherwise. ### Any related issues, documentation, discussions? <!-- Please use this section to link other resources if not mentioned already. 1. If this PR fixes an issue, please include `Fixes #1234`, `Resolves #1234` or `Closes #1234`. If it is only related, simply mention the issue number. 2. If there is design documentation, please add the link. 3. If there is a discussion in the mailing list, please add the link. --> Closes #5547 ### How was this PR tested? <!-- If tests were added, say they were added here. Or simply mention that if the PR is tested with existing test cases. Make sure to include/update test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> - Verified the JSON key set written by PythonWorkflowWorker.scala exactly matches the keys read in texera_run_python_worker.py (19 keys, no drift). - `WorkflowExecutionService/compile` (amber) succeeds. - `scalafmtCheckAll` passes; scalafix rules (RemoveUnused, ProcedureSyntax) are satisfied (the new import is used). - Python entry script passes `py_compile`. - End-to-end worker launch is exercised by the existing CI integration jobs (amber-integration), which start a real Python worker. ### Was this PR authored or co-authored using generative AI tooling? <!-- If generative AI tooling has been used in the process of authoring this PR, please include the phrase: 'Generated-by: ' followed by the name of the tool and its version. If no, write 'No'. Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details. --> Generated-by: Claude Code (Claude Opus 4.8) --- amber/src/main/python/texera_run_python_worker.py | 130 +++++++++++----- .../pythonworker/PythonWorkflowWorker.scala | 91 +++++++++--- amber/src/test/python/test_run_python_worker.py | 165 +++++++++++++++++++++ .../PythonWorkflowWorkerStartupConfigSpec.scala | 91 ++++++++++++ 4 files changed, 415 insertions(+), 62 deletions(-) diff --git a/amber/src/main/python/texera_run_python_worker.py b/amber/src/main/python/texera_run_python_worker.py index 5c3e25e096..1cbec51a04 100644 --- a/amber/src/main/python/texera_run_python_worker.py +++ b/amber/src/main/python/texera_run_python_worker.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import json import sys from loguru import logger @@ -49,54 +50,103 @@ def init_loguru_logger(stream_log_level) -> None: logger.add(sys.stderr, level=stream_log_level) -if __name__ == "__main__": - ( - _, - worker_id, - output_port, - logger_level, - r_path, - iceberg_catalog_type, - iceberg_postgres_catalog_uri_without_scheme, - iceberg_postgres_catalog_username, - iceberg_postgres_catalog_password, - iceberg_rest_catalog_uri, - iceberg_rest_catalog_warehouse_name, - iceberg_table_namespace, - iceberg_table_state_namespace, - iceberg_file_storage_directory_path, - iceberg_table_commit_batch_size, - s3_endpoint, - s3_region, - s3_auth_username, - s3_auth_password, - s3_large_binaries_base_uri, - ) = sys.argv - init_loguru_logger(logger_level) +# Keys the JVM side (PythonWorkflowWorker) sends in the startup-config JSON. +# Declared here so any drift between the two sides fails loudly instead of being +# silently misassigned, as could happen with the previous positional unpacking. +EXPECTED_CONFIG_KEYS = frozenset( + { + "workerId", + "outputPort", + "loggerLevel", + "rPath", + "icebergCatalogType", + "icebergPostgresCatalogUriWithoutScheme", + "icebergPostgresCatalogUsername", + "icebergPostgresCatalogPassword", + "icebergRestCatalogUri", + "icebergRestCatalogWarehouseName", + "icebergTableNamespace", + "icebergTableStateNamespace", + "icebergFileStorageDirectoryPath", + "icebergTableCommitBatchSize", + "s3Endpoint", + "s3Region", + "s3AuthUsername", + "s3AuthPassword", + "s3LargeBinariesBaseUri", + } +) + + +def parse_startup_config(raw_config: str) -> dict: + """Parse and validate the JSON startup configuration. + + The configuration is passed by name (see PythonWorkflowWorker on the JVM + side), so the two sides must agree on an exact key set. Key order is + irrelevant since it is a JSON object. Any drift fails loudly: + - a missing or unexpected key raises ValueError; + - a non-string value raises TypeError. + """ + config = json.loads(raw_config) + if not isinstance(config, dict): + raise TypeError( + f"startup config must be a JSON object, got {type(config).__name__}" + ) + + actual_keys = set(config) + missing = EXPECTED_CONFIG_KEYS - actual_keys + unexpected = actual_keys - EXPECTED_CONFIG_KEYS + if missing or unexpected: + raise ValueError( + f"startup config key mismatch: missing={sorted(missing)}, " + f"unexpected={sorted(unexpected)}" + ) + + non_string_keys = sorted(k for k, v in config.items() if not isinstance(v, str)) + if non_string_keys: + raise TypeError( + f"startup config values must be strings; non-string keys: {non_string_keys}" + ) + + return config + + +def main(raw_config: str) -> None: + """Start a Python worker from its validated JSON startup configuration.""" + config = parse_startup_config(raw_config) + + init_loguru_logger(config["loggerLevel"]) StorageConfig.initialize( - iceberg_catalog_type, - iceberg_postgres_catalog_uri_without_scheme, - iceberg_postgres_catalog_username, - iceberg_postgres_catalog_password, - iceberg_rest_catalog_uri, - iceberg_rest_catalog_warehouse_name, - iceberg_table_namespace, - iceberg_table_state_namespace, - iceberg_file_storage_directory_path, - iceberg_table_commit_batch_size, - s3_endpoint, - s3_region, - s3_auth_username, - s3_auth_password, - s3_large_binaries_base_uri, + config["icebergCatalogType"], + config["icebergPostgresCatalogUriWithoutScheme"], + config["icebergPostgresCatalogUsername"], + config["icebergPostgresCatalogPassword"], + config["icebergRestCatalogUri"], + config["icebergRestCatalogWarehouseName"], + config["icebergTableNamespace"], + config["icebergTableStateNamespace"], + config["icebergFileStorageDirectoryPath"], + config["icebergTableCommitBatchSize"], + config["s3Endpoint"], + config["s3Region"], + config["s3AuthUsername"], + config["s3AuthPassword"], + config["s3LargeBinariesBaseUri"], ) # Setting R_HOME environment variable for R-UDF usage + r_path = config["rPath"] if r_path: import os os.environ["R_HOME"] = r_path PythonWorker( - worker_id=worker_id, host="localhost", output_port=int(output_port) + worker_id=config["workerId"], + host="localhost", + output_port=int(config["outputPort"]), ).run() + + +if __name__ == "__main__": + main(sys.argv[1]) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala index bd9f052b06..3639ce96a6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala @@ -39,6 +39,7 @@ import org.apache.texera.amber.engine.common.actormessage.{Backpressure, CreditU import org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize import org.apache.texera.amber.engine.common.ambermessage._ import org.apache.texera.amber.engine.common.{CheckpointState, Utils} +import org.apache.texera.amber.util.JSONUtils.objectMapper import java.nio.file.Path import org.apache.texera.web.resource.pythonvirtualenvironment.PveManager @@ -47,6 +48,62 @@ import scala.sys.process.{BasicIO, Process} object PythonWorkflowWorker { def props(workerConfig: WorkerConfig): Props = Props(new PythonWorkflowWorker(workerConfig)) + + /** + * Serialize the Python worker startup configuration to a JSON object, keyed by + * name. Built from a sequence of (key, value) pairs so a duplicate key fails + * loudly here instead of being silently dropped by Map construction. + */ + def encodeStartupConfig(entries: Seq[(String, String)]): String = { + val duplicateKeys = entries.groupBy(_._1).collect { case (key, group) if group.size > 1 => key } + require( + duplicateKeys.isEmpty, + s"duplicate Python worker startup config keys: ${duplicateKeys.mkString(", ")}" + ) + objectMapper.writeValueAsString(entries.toMap) + } + + /** + * Assemble the Python worker startup configuration as named (key, value) pairs. + * Worker-specific values are passed in; storage-related values are read from the + * shared StorageConfig (Postgres/REST catalog fields are blank unless that catalog + * type is active). Returned as a sequence (not a Map) so encodeStartupConfig can + * detect a duplicate key. + */ + def buildStartupConfig( + workerId: String, + outputPort: String, + rPath: String, + largeBinaryBaseUri: String + ): Seq[(String, String)] = { + val isPostgres = StorageConfig.icebergCatalogType == "postgres" + val isRest = StorageConfig.icebergCatalogType == "rest" + Seq( + "workerId" -> workerId, + "outputPort" -> outputPort, + "loggerLevel" -> UdfConfig.pythonLogStreamHandlerLevel, + "rPath" -> rPath, + "icebergCatalogType" -> StorageConfig.icebergCatalogType, + "icebergPostgresCatalogUriWithoutScheme" -> + (if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme else ""), + "icebergPostgresCatalogUsername" -> + (if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else ""), + "icebergPostgresCatalogPassword" -> + (if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else ""), + "icebergRestCatalogUri" -> (if (isRest) StorageConfig.icebergRESTCatalogUri else ""), + "icebergRestCatalogWarehouseName" -> + (if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else ""), + "icebergTableNamespace" -> StorageConfig.icebergTableResultNamespace, + "icebergTableStateNamespace" -> StorageConfig.icebergTableStateNamespace, + "icebergFileStorageDirectoryPath" -> StorageConfig.fileStorageDirectoryPath.toString, + "icebergTableCommitBatchSize" -> StorageConfig.icebergTableCommitBatchSize.toString, + "s3Endpoint" -> StorageConfig.s3Endpoint, + "s3Region" -> StorageConfig.s3Region, + "s3AuthUsername" -> StorageConfig.s3Username, + "s3AuthPassword" -> StorageConfig.s3Password, + "s3LargeBinariesBaseUri" -> largeBinaryBaseUri + ) + } } class PythonWorkflowWorker( @@ -184,33 +241,23 @@ class PythonWorkflowWorker( val pythonBin: String = choosePythonBin() - // Set the Iceberg related arguments based on the catalog type. - val isPostgres = StorageConfig.icebergCatalogType == "postgres" - val isRest = StorageConfig.icebergCatalogType == "rest" + // Pass startup configuration to the Python worker by name, as a single JSON + // object, rather than by argv position. This way the two sides agree by key, + // so adding/removing/reordering a field can no longer silently misassign + // values; a missing or renamed key fails loudly on the Python side instead. + val startupConfig = PythonWorkflowWorker.buildStartupConfig( + workerConfig.workerId.name, + Integer.toString(pythonProxyServer.getPortNumber.get()), + RENVPath, + workerConfig.largeBinaryBaseUri + ) + pythonServerProcess = Process( Seq( pythonBin, "-u", udfEntryScriptPath, - workerConfig.workerId.name, - Integer.toString(pythonProxyServer.getPortNumber.get()), - UdfConfig.pythonLogStreamHandlerLevel, - RENVPath, - StorageConfig.icebergCatalogType, - if (isPostgres) StorageConfig.icebergPostgresCatalogUriWithoutScheme else "", - if (isPostgres) StorageConfig.icebergPostgresCatalogUsername else "", - if (isPostgres) StorageConfig.icebergPostgresCatalogPassword else "", - if (isRest) StorageConfig.icebergRESTCatalogUri else "", - if (isRest) StorageConfig.icebergRESTCatalogWarehouseName else "", - StorageConfig.icebergTableResultNamespace, - StorageConfig.icebergTableStateNamespace, - StorageConfig.fileStorageDirectoryPath.toString, - StorageConfig.icebergTableCommitBatchSize.toString, - StorageConfig.s3Endpoint, - StorageConfig.s3Region, - StorageConfig.s3Username, - StorageConfig.s3Password, - workerConfig.largeBinaryBaseUri + PythonWorkflowWorker.encodeStartupConfig(startupConfig) ) ).run(BasicIO.standard(false)) } diff --git a/amber/src/test/python/test_run_python_worker.py b/amber/src/test/python/test_run_python_worker.py new file mode 100644 index 0000000000..3cc41bc9e2 --- /dev/null +++ b/amber/src/test/python/test_run_python_worker.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from unittest import mock + +import pytest + +import texera_run_python_worker as entry + + +def _full_config() -> dict: + """A complete startup config matching the keys PythonWorkflowWorker emits.""" + return { + "workerId": "worker-1", + "outputPort": "5005", + "loggerLevel": "INFO", + "rPath": "", + "icebergCatalogType": "postgres", + "icebergPostgresCatalogUriWithoutScheme": "host:5432/db", + "icebergPostgresCatalogUsername": "pg-user", + "icebergPostgresCatalogPassword": "pg-pass", + "icebergRestCatalogUri": "", + "icebergRestCatalogWarehouseName": "", + "icebergTableNamespace": "result_ns", + "icebergTableStateNamespace": "state_ns", + "icebergFileStorageDirectoryPath": "/tmp/files", + "icebergTableCommitBatchSize": "100", + "s3Endpoint": "http://s3:9000", + "s3Region": "us-west-2", + "s3AuthUsername": "s3-user", + "s3AuthPassword": "s3-pass", + "s3LargeBinariesBaseUri": "s3://bucket/base", + } + + +def _patched_collaborators(): + """Patch the heavy collaborators so main() exercises only the config wiring.""" + return ( + mock.patch.object(entry, "StorageConfig"), + mock.patch.object(entry, "PythonWorker"), + mock.patch.object(entry, "init_loguru_logger"), + ) + + +def test_full_config_keys_match_the_expected_set(): + # Guards against the sample config in this test drifting from the contract. + assert set(_full_config()) == set(entry.EXPECTED_CONFIG_KEYS) + + +def test_main_maps_named_config_to_storage_and_worker(): + """Each named field reaches the correct StorageConfig.initialize argument and + worker parameter — guarding against the silent misalignment that positional + argv passing allowed.""" + config = _full_config() + storage_patch, worker_patch, _logger_patch = _patched_collaborators() + with storage_patch as storage_config, worker_patch as python_worker, _logger_patch: + entry.main(json.dumps(config)) + + storage_config.initialize.assert_called_once_with( + "postgres", + "host:5432/db", + "pg-user", + "pg-pass", + "", + "", + "result_ns", + "state_ns", + "/tmp/files", + "100", + "http://s3:9000", + "us-west-2", + "s3-user", + "s3-pass", + "s3://bucket/base", + ) + python_worker.assert_called_once_with( + worker_id="worker-1", host="localhost", output_port=5005 + ) + python_worker.return_value.run.assert_called_once() + + +def test_main_mapping_is_independent_of_key_order(): + """Reordering the JSON keys must not change where values land (it is a dict).""" + reordered = dict(reversed(list(_full_config().items()))) + storage_patch, worker_patch, _logger_patch = _patched_collaborators() + with storage_patch as storage_config, worker_patch as python_worker, _logger_patch: + entry.main(json.dumps(reordered)) + + storage_config.initialize.assert_called_once_with( + "postgres", + "host:5432/db", + "pg-user", + "pg-pass", + "", + "", + "result_ns", + "state_ns", + "/tmp/files", + "100", + "http://s3:9000", + "us-west-2", + "s3-user", + "s3-pass", + "s3://bucket/base", + ) + python_worker.assert_called_once_with( + worker_id="worker-1", host="localhost", output_port=5005 + ) + + +def test_main_sets_r_home_when_r_path_present(monkeypatch): + monkeypatch.delenv("R_HOME", raising=False) + config = _full_config() + config["rPath"] = "/opt/R" + storage_patch, worker_patch, _logger_patch = _patched_collaborators() + with storage_patch, worker_patch, _logger_patch: + import os + + entry.main(json.dumps(config)) + assert os.environ["R_HOME"] == "/opt/R" + + [email protected]("missing_key", sorted(_full_config().keys())) +def test_parse_rejects_a_missing_key(missing_key): + """A missing key fails loudly rather than being silently misassigned.""" + config = _full_config() + del config[missing_key] + with pytest.raises(ValueError, match="key mismatch"): + entry.parse_startup_config(json.dumps(config)) + + +def test_parse_rejects_an_unexpected_key(): + """An extra key (e.g. the JVM side added a field) fails instead of being ignored.""" + config = _full_config() + config["someNewField"] = "value" + with pytest.raises(ValueError, match="key mismatch"): + entry.parse_startup_config(json.dumps(config)) + + +def test_parse_rejects_a_non_string_value(): + """A wrongly-typed value (e.g. a number instead of a string) fails.""" + config = _full_config() + config["outputPort"] = 5005 # number instead of the expected string + with pytest.raises(TypeError, match="must be strings"): + entry.parse_startup_config(json.dumps(config)) + + +def test_parse_rejects_a_non_object_payload(): + with pytest.raises(TypeError, match="must be a JSON object"): + entry.parse_startup_config(json.dumps(["not", "an", "object"])) diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala new file mode 100644 index 0000000000..26f6cb937c --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorkerStartupConfigSpec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.pythonworker + +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec + +class PythonWorkflowWorkerStartupConfigSpec extends AnyFlatSpec { + + "encodeStartupConfig" should "serialize entries to a JSON object keyed by name" in { + val json = PythonWorkflowWorker.encodeStartupConfig( + Seq("workerId" -> "w-1", "outputPort" -> "5005", "s3Region" -> "us-west-2") + ) + val parsed = objectMapper.readValue(json, classOf[java.util.Map[String, String]]) + assert(parsed.get("workerId") == "w-1") + assert(parsed.get("outputPort") == "5005") + assert(parsed.get("s3Region") == "us-west-2") + assert(parsed.size() == 3) + } + + it should "fail loudly when the same key appears more than once" in { + val exception = intercept[IllegalArgumentException] { + PythonWorkflowWorker.encodeStartupConfig( + Seq("s3Region" -> "us-west-2", "s3Region" -> "us-east-1") + ) + } + assert(exception.getMessage.contains("duplicate")) + } + + private val expectedKeys = Set( + "workerId", + "outputPort", + "loggerLevel", + "rPath", + "icebergCatalogType", + "icebergPostgresCatalogUriWithoutScheme", + "icebergPostgresCatalogUsername", + "icebergPostgresCatalogPassword", + "icebergRestCatalogUri", + "icebergRestCatalogWarehouseName", + "icebergTableNamespace", + "icebergTableStateNamespace", + "icebergFileStorageDirectoryPath", + "icebergTableCommitBatchSize", + "s3Endpoint", + "s3Region", + "s3AuthUsername", + "s3AuthPassword", + "s3LargeBinariesBaseUri" + ) + + "buildStartupConfig" should "produce exactly the expected named keys with the worker values" in { + val config = + PythonWorkflowWorker.buildStartupConfig("worker-7", "6000", "/opt/R", "s3://bucket/uri") + val map = config.toMap + + assert(config.size == expectedKeys.size, "no duplicate or missing keys") + assert(map.keySet == expectedKeys) + assert(map("workerId") == "worker-7") + assert(map("outputPort") == "6000") + assert(map("rPath") == "/opt/R") + assert(map("s3LargeBinariesBaseUri") == "s3://bucket/uri") + } + + it should "produce a config that round-trips through encodeStartupConfig" in { + val json = PythonWorkflowWorker.encodeStartupConfig( + PythonWorkflowWorker.buildStartupConfig("w", "1", "", "uri") + ) + val parsed = objectMapper.readValue(json, classOf[java.util.Map[String, String]]) + assert(parsed.get("workerId") == "w") + assert(parsed.get("s3LargeBinariesBaseUri") == "uri") + assert(parsed.size() == expectedKeys.size) + } +}
