gemini-code-assist[bot] commented on code in PR #38701:
URL: https://github.com/apache/beam/pull/38701#discussion_r3307231570
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
Review Comment:

Initialize `self._etcd_data_dir` to `None` in `__init__` to track the
temporary etcd data directory for proper cleanup.
```suggestion
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
self._dynamo_process = None
self._etcd_process = None
self._etcd_data_dir = None
self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
self._use_dynamo = use_dynamo
```
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+
+ def _stop_processes(self) -> None:
+ self._stop_process(self._dynamo_process)
+ self._stop_process(self._server_process)
+ self._stop_process(self._etcd_process)
+ if (self._managed_etcd_endpoint is not None and
+ os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+ del os.environ['ETCD_ENDPOINTS']
+ self._dynamo_process = None
+ self._server_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
+ self._server_started = False
+ self._server_port = -1
Review Comment:

To prevent disk space leaks in `/tmp` from abandoned etcd data directories,
and to avoid potential `AttributeError` or `NameError` during interpreter
shutdown (when global modules like `os` or `shutil` might be
garbage-collected), we should:
1. Clean up the temporary etcd data directory using `shutil.rmtree` in
`_stop_processes`.
2. Import `os` and `shutil` locally inside `_stop_processes` to ensure they
are available during interpreter shutdown.
3. Use `local_os.environ.pop` to safely remove the environment variable
without raising a `KeyError`.
```python
def _stop_processes(self) -> None:
import os as local_os
import shutil as local_shutil
self._stop_process(self._dynamo_process)
self._stop_process(self._server_process)
self._stop_process(self._etcd_process)
if (self._managed_etcd_endpoint is not None and
local_os.environ.get('ETCD_ENDPOINTS') ==
self._managed_etcd_endpoint):
local_os.environ.pop('ETCD_ENDPOINTS', None)
if getattr(self, '_etcd_data_dir', None):
local_shutil.rmtree(self._etcd_data_dir, ignore_errors=True)
self._etcd_data_dir = None
self._dynamo_process = None
self._server_process = None
self._etcd_process = None
self._managed_etcd_endpoint = None
self._server_started = False
self._server_port = -1
```
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+
+ def _stop_processes(self) -> None:
+ self._stop_process(self._dynamo_process)
+ self._stop_process(self._server_process)
+ self._stop_process(self._etcd_process)
+ if (self._managed_etcd_endpoint is not None and
+ os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+ del os.environ['ETCD_ENDPOINTS']
+ self._dynamo_process = None
+ self._server_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
+ self._server_started = False
+ self._server_port = -1
+
+ def _process_status(self) -> str:
+ process_status = []
+ if self._server_process is not None:
+ process_status.append(
+ 'frontend/server exit code: %s' % self._server_process.poll())
+ if self._dynamo_process is not None:
+ process_status.append(
+ 'dynamo worker exit code: %s' % self._dynamo_process.poll())
+ if self._etcd_process is not None:
+ process_status.append('etcd exit code: %s' % self._etcd_process.poll())
+ return ', '.join(process_status) or 'no process status available'
+
+ def __del__(self):
+ self._stop_processes()
+
+ def _uses_embedded_etcd(self) -> bool:
+ return (
+ self._use_dynamo and
+ _uses_etcd_discovery(self._dynamo_frontend_kwargs) and
+ _uses_etcd_discovery(self._vllm_server_kwargs) and
+ 'ETCD_ENDPOINTS' not in os.environ)
+
+ def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None:
+ deadline = time.time() + timeout_secs
+ health_url = endpoint.rstrip('/') + '/health'
+ while time.time() < deadline and self._etcd_process.poll() is None:
+ try:
+ with urllib.request.urlopen(health_url, timeout=2) as response:
+ if response.status < 500:
+ return
+ except Exception: # pylint: disable=broad-except
+ time.sleep(1)
+
+ process_status = self._process_status()
+ self._stop_processes()
+ raise RuntimeError(
+ "Failed to start embedded etcd for Dynamo. Process status: "
+ f"{process_status}. Install etcd in the worker container or set "
+ "ETCD_ENDPOINTS to an external etcd service.")
+
+ def _ensure_etcd(self) -> None:
+ if not self._uses_embedded_etcd():
+ return
+ if shutil.which('etcd') is None:
+ raise RuntimeError(
+ "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not "
+ "set. Install etcd in the worker container or set ETCD_ENDPOINTS "
+ "to an external etcd service.")
+
+ etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
+ etcd_data_dir = f'/tmp/{etcd_name}'
+ peer_port, = subprocess_server.pick_port(None)
+ etcd_cmd = [
+ 'etcd',
+ '--name',
+ etcd_name,
+ '--listen-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--advertise-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--listen-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-advertise-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-cluster',
+ f'{etcd_name}=http://127.0.0.1:{peer_port}',
+ '--data-dir',
+ etcd_data_dir,
+ '--log-level',
+ 'warn',
+ ]
Review Comment:

Store the temporary etcd data directory path in `self._etcd_data_dir`
instead of a local variable so that it can be cleaned up when stopping the
processes.
```python
etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
self._etcd_data_dir = f'/tmp/{etcd_name}'
peer_port, = subprocess_server.pick_port(None)
etcd_cmd = [
'etcd',
'--name',
etcd_name,
'--listen-client-urls',
'http://127.0.0.1:{{PORT}}',
'--advertise-client-urls',
'http://127.0.0.1:{{PORT}}',
'--listen-peer-urls',
f'http://127.0.0.1:{peer_port}',
'--initial-advertise-peer-urls',
f'http://127.0.0.1:{peer_port}',
'--initial-cluster',
f'{etcd_name}=http://127.0.0.1:{peer_port}',
'--data-dir',
self._etcd_data_dir,
'--log-level',
'warn',
]
```
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
Review Comment:

Wrap the process termination sequence in a `try...except OSError` block. If
a process exits concurrently between the `poll()` check and
`terminate()`/`kill()`, calling these methods can raise an `OSError` (such as
`ProcessLookupError`), which would crash the cleanup sequence and leave other
processes running.
```suggestion
@staticmethod
def _stop_process(process: Optional[subprocess.Popen]) -> None:
if process is None or process.poll() is not None:
return
try:
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
except OSError:
pass
```
##########
sdks/python/apache_beam/ml/inference/vllm_inference.py:
##########
@@ -109,46 +111,214 @@ def getAsyncVLLMClient(port) -> AsyncOpenAI:
)
+# Embedded Dynamo runtime defaults proven on the smoke test: etcd discovery,
+# TCP request plane, ZMQ event plane, KV events disabled. KV-aware routing,
+# disaggregated prefill/decode, and the Planner are not active in this mode.
+_DYNAMO_FRONTEND_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'router-mode': 'round-robin',
+ 'no-router-kv-events': None,
+}
+
+_DYNAMO_ENGINE_DEFAULT_KWARGS: dict[str, Optional[str]] = {
+ 'discovery-backend': 'etcd',
+ 'request-plane': 'tcp',
+ 'event-plane': 'zmq',
+ 'kv-events-config': '{"enable_kv_cache_events": false}',
+}
+
+
+def _append_kwargs(cmd: list[str], kwargs: dict[str, Optional[str]]) -> None:
+ for k, v in kwargs.items():
+ cmd.append(f'--{k}')
+ # Only add values for commands with value part.
+ if v is not None:
+ cmd.append(v)
+
+
+def _uses_etcd_discovery(kwargs: dict[str, Optional[str]]) -> bool:
+ return kwargs.get('discovery-backend') == 'etcd'
+
+
class _VLLMModelServer():
- def __init__(self, model_name: str, vllm_server_kwargs: dict[str, str]):
+ def __init__(
+ self,
+ model_name: str,
+ vllm_server_kwargs: dict[str, Optional[str]],
+ dynamo_frontend_kwargs: Optional[dict[str, Optional[str]]] = None,
+ use_dynamo: bool = False):
self._model_name = model_name
self._vllm_server_kwargs = vllm_server_kwargs
+ self._dynamo_frontend_kwargs = dynamo_frontend_kwargs or {}
self._server_started = False
self._server_process = None
+ self._dynamo_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
self._server_port: int = -1
self._server_process_lock = threading.RLock()
+ self._use_dynamo = use_dynamo
self.start_server()
+ @staticmethod
+ def _stop_process(process: Optional[subprocess.Popen]) -> None:
+ if process is None or process.poll() is not None:
+ return
+ process.terminate()
+ try:
+ process.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ process.wait()
+
+ def _stop_processes(self) -> None:
+ self._stop_process(self._dynamo_process)
+ self._stop_process(self._server_process)
+ self._stop_process(self._etcd_process)
+ if (self._managed_etcd_endpoint is not None and
+ os.environ.get('ETCD_ENDPOINTS') == self._managed_etcd_endpoint):
+ del os.environ['ETCD_ENDPOINTS']
+ self._dynamo_process = None
+ self._server_process = None
+ self._etcd_process = None
+ self._managed_etcd_endpoint = None
+ self._server_started = False
+ self._server_port = -1
+
+ def _process_status(self) -> str:
+ process_status = []
+ if self._server_process is not None:
+ process_status.append(
+ 'frontend/server exit code: %s' % self._server_process.poll())
+ if self._dynamo_process is not None:
+ process_status.append(
+ 'dynamo worker exit code: %s' % self._dynamo_process.poll())
+ if self._etcd_process is not None:
+ process_status.append('etcd exit code: %s' % self._etcd_process.poll())
+ return ', '.join(process_status) or 'no process status available'
+
+ def __del__(self):
+ self._stop_processes()
+
+ def _uses_embedded_etcd(self) -> bool:
+ return (
+ self._use_dynamo and
+ _uses_etcd_discovery(self._dynamo_frontend_kwargs) and
+ _uses_etcd_discovery(self._vllm_server_kwargs) and
+ 'ETCD_ENDPOINTS' not in os.environ)
+
+ def _wait_for_etcd(self, endpoint: str, timeout_secs=30) -> None:
+ deadline = time.time() + timeout_secs
+ health_url = endpoint.rstrip('/') + '/health'
+ while time.time() < deadline and self._etcd_process.poll() is None:
+ try:
+ with urllib.request.urlopen(health_url, timeout=2) as response:
+ if response.status < 500:
+ return
+ except Exception: # pylint: disable=broad-except
+ time.sleep(1)
+
+ process_status = self._process_status()
+ self._stop_processes()
+ raise RuntimeError(
+ "Failed to start embedded etcd for Dynamo. Process status: "
+ f"{process_status}. Install etcd in the worker container or set "
+ "ETCD_ENDPOINTS to an external etcd service.")
+
+ def _ensure_etcd(self) -> None:
+ if not self._uses_embedded_etcd():
+ return
+ if shutil.which('etcd') is None:
+ raise RuntimeError(
+ "Embedded Dynamo mode requires etcd when ETCD_ENDPOINTS is not "
+ "set. Install etcd in the worker container or set ETCD_ENDPOINTS "
+ "to an external etcd service.")
+
+ etcd_name = f'beam-dynamo-etcd-{uuid.uuid4().hex}'
+ etcd_data_dir = f'/tmp/{etcd_name}'
+ peer_port, = subprocess_server.pick_port(None)
+ etcd_cmd = [
+ 'etcd',
+ '--name',
+ etcd_name,
+ '--listen-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--advertise-client-urls',
+ 'http://127.0.0.1:{{PORT}}',
+ '--listen-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-advertise-peer-urls',
+ f'http://127.0.0.1:{peer_port}',
+ '--initial-cluster',
+ f'{etcd_name}=http://127.0.0.1:{peer_port}',
+ '--data-dir',
+ etcd_data_dir,
+ '--log-level',
+ 'warn',
+ ]
+ self._etcd_process, etcd_port = start_process(etcd_cmd)
+ endpoint = f'http://127.0.0.1:{etcd_port}'
+ os.environ['ETCD_ENDPOINTS'] = endpoint
+ self._managed_etcd_endpoint = endpoint
+ self._wait_for_etcd(endpoint)
+
def start_server(self, retries=3):
with self._server_process_lock:
if not self._server_started:
- server_cmd = [
- sys.executable,
- '-m',
- 'vllm.entrypoints.openai.api_server',
- '--model',
- self._model_name,
- '--port',
- '{{PORT}}',
- ]
- for k, v in self._vllm_server_kwargs.items():
- server_cmd.append(f'--{k}')
- # Only add values for commands with value part.
- if v is not None:
- server_cmd.append(v)
+ self._stop_processes()
+ self._ensure_etcd()
+ if self._use_dynamo:
+ # Dynamo embedded mode uses the frontend as its OpenAI-compatible
+ # local endpoint and a separate vLLM worker process.
+ server_cmd = [
+ sys.executable,
+ '-m',
+ 'dynamo.frontend',
+ '--http-port',
+ '{{PORT}}',
+ ]
+ _append_kwargs(server_cmd, self._dynamo_frontend_kwargs)
+ else:
+ server_cmd = [
+ sys.executable,
+ '-m',
+ 'vllm.entrypoints.openai.api_server',
+ '--model',
+ self._model_name,
+ '--port',
+ '{{PORT}}',
+ ]
+ _append_kwargs(server_cmd, self._vllm_server_kwargs)
self._server_process, self._server_port = start_process(server_cmd)
+ if self._use_dynamo:
+ server_cmd = [
+ sys.executable,
+ '-m',
+ 'dynamo.vllm',
+ '--model',
+ self._model_name,
+ ]
+ _append_kwargs(server_cmd, self._vllm_server_kwargs)
+ self._dynamo_process, _ = start_process(server_cmd)
+
self.check_connectivity(retries)
def get_server_port(self) -> int:
if not self._server_started:
self.start_server()
return self._server_port
- def check_connectivity(self, retries=3):
+ def check_connectivity(self, retries=3, timeout_secs=600):
+ start_time = time.time()
with getVLLMClient(self._server_port) as client:
- while self._server_process.poll() is None:
+ while (time.time() - start_time < timeout_secs and
+ self._server_process.poll() is None and
+ (self._dynamo_process is None or
+ self._dynamo_process.poll() is None)):
Review Comment:

Add a check for `self._etcd_process` in the connectivity polling loop. If
the embedded etcd process dies, the frontend and engine won't be able to
communicate, and the server will fail. Checking it here allows the loop to
fail-fast instead of waiting for the full 10-minute timeout.
```python
while (time.time() - start_time < timeout_secs and
self._server_process.poll() is None and
(self._dynamo_process is None or
self._dynamo_process.poll() is None) and
(self._etcd_process is None or
self._etcd_process.poll() is None)):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]