This is an automated email from the ASF dual-hosted git repository. yasith pushed a commit to branch resource-mgmt-rest-api in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 1dac7e87c48d1f5ea5ef8588170aa21249d3eba8 Author: yasithdev <[email protected]> AuthorDate: Fri Oct 31 13:42:08 2025 -0400 improved error handling for failed experiment launch --- .../airavata_experiments/airavata.py | 48 ++- dev-tools/batch_launch_experiments.py | 334 ++++++++++++++++----- dev-tools/create_launch_experiment_with_storage.py | 62 +++- 3 files changed, 352 insertions(+), 92 deletions(-) diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py index e48e9c58c4..24961dcfc1 100644 --- a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py +++ b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py @@ -703,25 +703,61 @@ class AiravataOperator: # wait until experiment begins, then get process id print(f"[AV] Experiment {experiment_name} WAITING until experiment begins...") process_id = None - while process_id is None: + max_wait_process = 300 # 10 minutes max wait for process + wait_count_process = 0 + while process_id is None and wait_count_process < max_wait_process: + # Check experiment status - if failed, raise error + try: + status = self.get_experiment_status(ex_id) + if status == ExperimentState.FAILED: + raise Exception(f"[AV] Experiment {experiment_name} FAILED while waiting for process to begin") + if status in [ExperimentState.COMPLETED, ExperimentState.CANCELED]: + raise Exception(f"[AV] Experiment {experiment_name} reached terminal state {status.name} while waiting for process") + except Exception as status_err: + if "FAILED" in str(status_err) or "terminal state" in str(status_err): + raise status_err + try: process_id = self.get_process_id(ex_id) except: + pass + + if process_id is None: time.sleep(2) - else: - time.sleep(2) + wait_count_process += 1 + + if process_id is None: + raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for process to begin") print(f"[AV] Experiment {experiment_name} EXECUTING with pid: {process_id}") # wait until task begins, then get job id print(f"[AV] Experiment {experiment_name} WAITING until task begins...") job_id = job_state = None - while job_id in [None, "N/A"]: + max_wait_task = 300 # 10 minutes max wait for task + wait_count_task = 0 + while job_id in [None, "N/A"] and wait_count_task < max_wait_task: + # Check experiment status - if failed, raise error + try: + status = self.get_experiment_status(ex_id) + if status == ExperimentState.FAILED: + raise Exception(f"[AV] Experiment {experiment_name} FAILED while waiting for task to begin") + if status in [ExperimentState.COMPLETED, ExperimentState.CANCELED]: + raise Exception(f"[AV] Experiment {experiment_name} reached terminal state {status.name} while waiting for task") + except Exception as status_err: + if "FAILED" in str(status_err) or "terminal state" in str(status_err): + raise status_err + try: job_id, job_state = self.get_task_status(ex_id) except: + pass + + if job_id in [None, "N/A"]: time.sleep(2) - else: - time.sleep(2) + wait_count_task += 1 + + if job_id in [None, "N/A"]: + raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for task to begin") assert job_state is not None, f"Job state is None for job id: {job_id}" print(f"[AV] Experiment {experiment_name} - Task {job_state.name} with id: {job_id}") diff --git a/dev-tools/batch_launch_experiments.py b/dev-tools/batch_launch_experiments.py index 3449c0e0da..8edaa47a7d 100644 --- a/dev-tools/batch_launch_experiments.py +++ b/dev-tools/batch_launch_experiments.py @@ -12,7 +12,7 @@ from rich.console import Console from rich.live import Live from rich.panel import Panel from rich.layout import Layout -from io import StringIO +from collections import deque os.environ['AUTH_SERVER_URL'] = "https://auth.dev.cybershuttle.org" os.environ['API_SERVER_HOSTNAME'] = "api.dev.cybershuttle.org" @@ -64,19 +64,86 @@ class JobResult(pydantic.BaseModel): error: str | None = None +def get_experiment_state_value(status) -> tuple[int, str, ExperimentState]: + """Extract state value, name, and enum from status. Returns (value, name, enum).""" + if isinstance(status, ExperimentState): + return status.value, status.name, status + + # Handle ExperimentStatus object + if hasattr(status, 'state'): + state = status.state + if isinstance(state, ExperimentState): + return state.value, state.name, state + elif hasattr(state, 'value'): + return state.value, state.name if hasattr(state, 'name') else str(state), state + + # Handle direct value/name access + status_value = status.value if hasattr(status, 'value') else (status if isinstance(status, int) else None) + status_name = status.name if hasattr(status, 'name') else str(status) + + # Convert to ExperimentState enum + if status_value is not None: + try: + enum_state = ExperimentState(status_value) + return status_value, status_name, enum_state + except (ValueError, TypeError): + pass + + # Fallback + return None, status_name, ExperimentState.FAILED + + def monitor_experiment_silent(operator: AiravataOperator, experiment_id: str, check_interval: int = 30) -> ExperimentState: """Monitor experiment silently until completion. Returns final status.""" - while True: - status = operator.get_experiment_status(experiment_id) - - if status in [ - ExperimentState.COMPLETED, - ExperimentState.CANCELED, - ExperimentState.FAILED, - ]: - return status + logger = logging.getLogger(__name__) + max_checks = 3600 # Maximum number of checks (about 5 hours at 5s interval) + check_count = 0 + + # Use shorter interval initially, then increase + initial_interval = min(check_interval, 5) # Check every 5 seconds initially + + while check_count < max_checks: + try: + status = operator.get_experiment_status(experiment_id) + + # Extract state information + status_value, status_name, status_enum = get_experiment_state_value(status) + + # Log status periodically for debugging + if check_count % 12 == 0: # Log every minute (12 * 5s) + logger.debug(f"Experiment {experiment_id} status check {check_count}: value={status_value}, name={status_name}") + + # Check terminal states: COMPLETED (7), CANCELED (6), FAILED (8) + if status_value is not None: + is_terminal = status_value in [ + ExperimentState.COMPLETED.value, # 7 + ExperimentState.CANCELED.value, # 6 + ExperimentState.FAILED.value # 8 + ] + else: + is_terminal = status_name in ['COMPLETED', 'CANCELED', 'FAILED'] + + if is_terminal: + logger.info(f"Experiment {experiment_id} reached terminal state: {status_name} (value: {status_value})") + return status_enum + + except Exception as e: + # If we can't get status, log but continue monitoring + logger.warning(f"Error checking experiment {experiment_id} status (check {check_count}): {e}") + import traceback + logger.debug(traceback.format_exc()) + if check_count > 10: # After several failed checks, assume failed + logger.error(f"Multiple status check failures for {experiment_id}, assuming FAILED") + return ExperimentState.FAILED - time.sleep(check_interval) + # Sleep before next check + sleep_time = initial_interval if check_count < 6 else check_interval + time.sleep(sleep_time) + check_count += 1 + + # If we've exceeded max checks, assume failed + logger.error(f"Experiment {experiment_id} monitoring timeout after {check_count} checks, assuming FAILED") + return ExperimentState.FAILED def submit_and_monitor_job( @@ -90,9 +157,17 @@ def submit_and_monitor_job( job_config = JobConfig(**job_config) try: + # Make experiment name unique for each job to avoid directory conflicts + # Using job_index ensures uniqueness and makes it easy to track + unique_experiment_name = f"{job_config.experiment_name}-job{job_index}" + + # Handle input_files and data_inputs same way as working version + input_files = job_config.input_files if job_config.input_files else None + data_inputs = job_config.data_inputs if job_config.data_inputs else None + result_dict = create_and_launch_experiment( access_token=access_token, - experiment_name=job_config.experiment_name, + experiment_name=unique_experiment_name, project_name=job_config.project_name, application_name=job_config.application_name, computation_resource_name=job_config.computation_resource_name, @@ -103,15 +178,39 @@ def submit_and_monitor_job( group_name=job_config.group_name, input_storage_host=job_config.input_storage_host, output_storage_host=job_config.output_storage_host, - input_files=job_config.input_files, - data_inputs=job_config.data_inputs, + input_files=input_files, + data_inputs=data_inputs, gateway_id=job_config.gateway_id, auto_schedule=job_config.auto_schedule, monitor=False, ) operator = AiravataOperator(access_token=access_token) - final_status = monitor_experiment_silent(operator, result_dict['experiment_id']) + experiment_id = result_dict['experiment_id'] + + # Check status immediately after submission to catch early failures + try: + initial_status = operator.get_experiment_status(experiment_id) + status_value, status_name, status_enum = get_experiment_state_value(initial_status) + + # Check if already in terminal state + if status_value is not None and status_value in [ + ExperimentState.COMPLETED.value, + ExperimentState.CANCELED.value, + ExperimentState.FAILED.value + ]: + # Already in terminal state + final_status = status_enum + else: + # Monitor until completion + final_status = monitor_experiment_silent(operator, experiment_id) + except Exception as e: + # If we can't check status, log and assume failed + logger = logging.getLogger(__name__) + logger.error(f"Error monitoring experiment {experiment_id}: {e}") + import traceback + logger.debug(traceback.format_exc()) + final_status = ExperimentState.FAILED result = ExperimentLaunchResult(**result_dict) @@ -123,6 +222,12 @@ def submit_and_monitor_job( success=final_status == ExperimentState.COMPLETED, ) except Exception as e: + # Log the error for debugging + import traceback + error_msg = f"{str(e)}\n{traceback.format_exc()}" + logger = logging.getLogger(__name__) + logger.error(f"Job {job_index} failed: {error_msg}") + return JobResult( job_index=job_index, experiment_id=None, @@ -146,46 +251,68 @@ def batch_submit_jobs( console = Console() results = [] - log_buffer = StringIO() + log_buffer = deque(maxlen=50) # Keep last 50 log lines for display - # Add handler to capture logs (without reconfiguring) - log_handler = logging.StreamHandler(log_buffer) + # Custom handler to capture logs to buffer + class ListHandler(logging.Handler): + def __init__(self, buffer): + super().__init__() + self.buffer = buffer + + def emit(self, record): + msg = self.format(record) + self.buffer.append(msg) + + log_handler = ListHandler(log_buffer) log_handler.setLevel(logging.INFO) + log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) + + # Add to root logger and module logger logging.root.addHandler(log_handler) + logger = logging.getLogger('create_launch_experiment_with_storage') + logger.addHandler(log_handler) - try: - # Configure progress bar - progress = Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), - TextColumn("•"), - TextColumn("{task.completed}/{task.total}"), - TimeElapsedColumn(), - console=console, - ) - - task = progress.add_task( - f"Jobs: 0/{num_copies} completed, 0 running", - total=num_copies - ) - - # Create layout with logs above and progress below - layout = Layout() - layout.split_column( - Layout(name="logs", size=None), - Layout(progress, name="progress", size=3) + # Configure progress bar + progress = Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("•"), + TextColumn("{task.completed}/{task.total}"), + TimeElapsedColumn(), + console=console, + ) + + task = progress.add_task( + f"{num_copies} total, 0 running, 0 completed, 0 failed", + total=num_copies + ) + + # Create layout with logs above and progress below + layout = Layout() + layout.split_column( + Layout(name="logs", size=None), + Layout(progress, name="progress", size=3) + ) + + def make_display(): + # Get logs from buffer - always show the latest logs (they're added to end of deque) + log_lines = list(log_buffer) if log_buffer else ["No logs yet..."] + # Show last 20 lines to keep display manageable and scrolled to bottom + display_lines = log_lines[-20:] if len(log_lines) > 20 else log_lines + log_text = '\n'.join(display_lines) + log_panel = Panel( + log_text, + title="Logs (latest)", + border_style="blue", + height=None, + expand=False ) - - def make_display(): - log_content = log_buffer.getvalue() - log_lines = log_content.split('\n')[-20:] # Last 20 lines - log_panel = Panel('\n'.join(log_lines), title="Logs", border_style="blue") - layout["logs"].update(log_panel) - layout["progress"].update(progress) - return layout - + layout["logs"].update(log_panel) + return layout + + try: # Use Live to keep layout fixed, progress at bottom with Live(make_display(), console=console, refresh_per_second=4, screen=True) as live: with ThreadPoolExecutor(max_workers=max_concurrent) as executor: @@ -199,38 +326,77 @@ def batch_submit_jobs( next_job_index += 1 # Process completed jobs and submit new ones - while active_futures: + # Continue until all jobs are submitted AND all active futures are done + while active_futures or next_job_index < num_copies: completed_futures = [f for f in active_futures if f.done()] for future in completed_futures: - results.append(future.result()) - active_futures.pop(future) + job_idx = active_futures.pop(future) - # Submit next job if available - if next_job_index < num_copies: + try: + result = future.result() + results.append(result) + except Exception as e: + # Handle unexpected exceptions + results.append(JobResult( + job_index=job_idx, + experiment_id=None, + status='ERROR', + result=None, + success=False, + error=str(e), + )) + + # Submit next jobs if available and we have capacity + while next_job_index < num_copies and len(active_futures) < max_concurrent: + try: new_future = executor.submit(submit_and_monitor_job, next_job_index, job_config, access_token) active_futures[new_future] = next_job_index next_job_index += 1 + except Exception as e: + # If submission itself fails, mark as error and continue + results.append(JobResult( + job_index=next_job_index, + experiment_id=None, + status='ERROR', + result=None, + success=False, + error=f"Submission failed: {str(e)}", + )) + next_job_index += 1 - # Update progress bar and refresh display + # Update progress bar with counts completed_count = len(results) running_count = len(active_futures) + submitted_count = next_job_index + successful_count = sum(1 for r in results if r.success) + failed_count = completed_count - successful_count + + # Show submitted count if not all jobs submitted yet + if submitted_count < num_copies: + status_desc = f"{num_copies} total, {submitted_count} submitted, {running_count} running, {completed_count} completed, {failed_count} failed" + else: + status_desc = f"{num_copies} total, {running_count} running, {completed_count} completed, {failed_count} failed" + progress.update( task, completed=completed_count, - description=f"Jobs: {completed_count}/{num_copies} completed, {running_count} running" + description=status_desc ) live.update(make_display()) - if not completed_futures: + if not completed_futures and next_job_index >= num_copies: + # Only sleep if nothing changed time.sleep(1) - + # Sort results by job_index results.sort(key=lambda x: x.job_index) return results finally: - # Clean up log handler + # Clean up log handlers logging.root.removeHandler(log_handler) + if log_handler in logger.handlers: + logger.removeHandler(log_handler) def main(): @@ -239,23 +405,39 @@ def main(): access_token = AuthContext.get_access_token() - # Job configuration + # Job configuration - matching create_launch_experiment_with_storage.py exactly + EXPERIMENT_NAME = "Test" + PROJECT_NAME = "Default Project" + APPLICATION_NAME = "NAMD-test" + GATEWAY_ID = None + COMPUTATION_RESOURCE_NAME = "NeuroData25VC2" + QUEUE_NAME = "cloud" + NODE_COUNT = 1 + CPU_COUNT = 1 + WALLTIME = 5 + GROUP_NAME = "Default" + INPUT_STORAGE_HOST = "gateway.dev.cybershuttle.org" + OUTPUT_STORAGE_HOST = "149.165.169.12" + INPUT_FILES = {} + DATA_INPUTS = {} + AUTO_SCHEDULE = False + job_config = JobConfig( - experiment_name='Test', - project_name='Default Project', - application_name='NAMD-test', - computation_resource_name='NeuroData25VC2', - queue_name='cloud', - node_count=1, - cpu_count=1, - walltime=5, - group_name='Default', - input_storage_host='gateway.dev.cybershuttle.org', - output_storage_host='149.165.169.12', - input_files=None, - data_inputs=None, - gateway_id=None, - auto_schedule=False, + experiment_name=EXPERIMENT_NAME, + project_name=PROJECT_NAME, + application_name=APPLICATION_NAME, + computation_resource_name=COMPUTATION_RESOURCE_NAME, + queue_name=QUEUE_NAME, + node_count=NODE_COUNT, + cpu_count=CPU_COUNT, + walltime=WALLTIME, + group_name=GROUP_NAME, + input_storage_host=INPUT_STORAGE_HOST, + output_storage_host=OUTPUT_STORAGE_HOST, + input_files=INPUT_FILES if INPUT_FILES else None, + data_inputs=DATA_INPUTS if DATA_INPUTS else None, + gateway_id=GATEWAY_ID, + auto_schedule=AUTO_SCHEDULE, ) num_copies = 10 diff --git a/dev-tools/create_launch_experiment_with_storage.py b/dev-tools/create_launch_experiment_with_storage.py index 95f66a957a..7d333e1234 100755 --- a/dev-tools/create_launch_experiment_with_storage.py +++ b/dev-tools/create_launch_experiment_with_storage.py @@ -141,20 +141,62 @@ def create_and_launch_experiment( return result +def get_experiment_state_value(status) -> tuple[int, str, ExperimentState]: + """Extract state value, name, and enum from status. Returns (value, name, enum).""" + if isinstance(status, ExperimentState): + return status.value, status.name, status + + # Handle ExperimentStatus object + if hasattr(status, 'state'): + state = status.state + if isinstance(state, ExperimentState): + return state.value, state.name, state + elif hasattr(state, 'value'): + return state.value, state.name if hasattr(state, 'name') else str(state), state + + # Handle direct value/name access + status_value = status.value if hasattr(status, 'value') else (status if isinstance(status, int) else None) + status_name = status.name if hasattr(status, 'name') else str(status) + + # Convert to ExperimentState enum + if status_value is not None: + try: + enum_state = ExperimentState(status_value) + return status_value, status_name, enum_state + except (ValueError, TypeError): + pass + + # Fallback + return None, status_name, ExperimentState.FAILED + + def monitor_experiment(operator: AiravataOperator, experiment_id: str, check_interval: int = 30): logger.info(f"Monitoring experiment {experiment_id}...") while True: - status = operator.get_experiment_status(experiment_id) - logger.info(f"Experiment status: {status.name}") - - if status in [ - ExperimentState.COMPLETED, - ExperimentState.CANCELED, - ExperimentState.FAILED, - ]: - logger.info(f"Experiment finished with state: {status.name}") - break + try: + status = operator.get_experiment_status(experiment_id) + status_value, status_name, status_enum = get_experiment_state_value(status) + logger.info(f"Experiment status: {status_name} (value: {status_value})") + + # Check terminal states: COMPLETED (7), CANCELED (6), FAILED (8) + if status_value is not None: + is_terminal = status_value in [ + ExperimentState.COMPLETED.value, # 7 + ExperimentState.CANCELED.value, # 6 + ExperimentState.FAILED.value # 8 + ] + else: + is_terminal = status_name in ['COMPLETED', 'CANCELED', 'FAILED'] + + if is_terminal: + logger.info(f"Experiment finished with state: {status_name}") + break + except Exception as e: + logger.error(f"Error checking experiment {experiment_id} status: {e}") + import traceback + logger.debug(traceback.format_exc()) + # Continue monitoring despite errors time.sleep(check_interval)
