This is an automated email from the ASF dual-hosted git repository.

yasith pushed a commit to branch resource-mgmt-rest-api
in repository https://gitbox.apache.org/repos/asf/airavata.git

commit 7e8c7a44dbba48078d60a145f7625339c1d45fce
Author: yasithdev <[email protected]>
AuthorDate: Fri Oct 31 13:42:08 2025 -0400

    improved error handling for failed experiment launch
---
 .../airavata_experiments/airavata.py               |  48 ++-
 dev-tools/batch_launch_experiments.py              | 334 ++++++++++++++++-----
 dev-tools/create_launch_experiment_with_storage.py |  62 +++-
 3 files changed, 352 insertions(+), 92 deletions(-)

diff --git a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py 
b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
index e48e9c58c4..24961dcfc1 100644
--- a/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
+++ b/dev-tools/airavata-python-sdk/airavata_experiments/airavata.py
@@ -703,25 +703,61 @@ class AiravataOperator:
     # wait until experiment begins, then get process id
     print(f"[AV] Experiment {experiment_name} WAITING until experiment 
begins...")
     process_id = None
-    while process_id is None:
+    max_wait_process = 300  # 10 minutes max wait for process
+    wait_count_process = 0
+    while process_id is None and wait_count_process < max_wait_process:
+      # Check experiment status - if failed, raise error
+      try:
+        status = self.get_experiment_status(ex_id)
+        if status == ExperimentState.FAILED:
+          raise Exception(f"[AV] Experiment {experiment_name} FAILED while 
waiting for process to begin")
+        if status in [ExperimentState.COMPLETED, ExperimentState.CANCELED]:
+          raise Exception(f"[AV] Experiment {experiment_name} reached terminal 
state {status.name} while waiting for process")
+      except Exception as status_err:
+        if "FAILED" in str(status_err) or "terminal state" in str(status_err):
+          raise status_err
+      
       try:
         process_id = self.get_process_id(ex_id)
       except:
+        pass
+      
+      if process_id is None:
         time.sleep(2)
-      else:
-        time.sleep(2)
+        wait_count_process += 1
+    
+    if process_id is None:
+      raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for 
process to begin")
     print(f"[AV] Experiment {experiment_name} EXECUTING with pid: 
{process_id}")
 
     # wait until task begins, then get job id
     print(f"[AV] Experiment {experiment_name} WAITING until task begins...")
     job_id = job_state = None
-    while job_id in [None, "N/A"]:
+    max_wait_task = 300  # 10 minutes max wait for task
+    wait_count_task = 0
+    while job_id in [None, "N/A"] and wait_count_task < max_wait_task:
+      # Check experiment status - if failed, raise error
+      try:
+        status = self.get_experiment_status(ex_id)
+        if status == ExperimentState.FAILED:
+          raise Exception(f"[AV] Experiment {experiment_name} FAILED while 
waiting for task to begin")
+        if status in [ExperimentState.COMPLETED, ExperimentState.CANCELED]:
+          raise Exception(f"[AV] Experiment {experiment_name} reached terminal 
state {status.name} while waiting for task")
+      except Exception as status_err:
+        if "FAILED" in str(status_err) or "terminal state" in str(status_err):
+          raise status_err
+      
       try:
         job_id, job_state = self.get_task_status(ex_id)
       except:
+        pass
+      
+      if job_id in [None, "N/A"]:
         time.sleep(2)
-      else:
-        time.sleep(2)
+        wait_count_task += 1
+    
+    if job_id in [None, "N/A"]:
+      raise Exception(f"[AV] Experiment {experiment_name} timeout waiting for 
task to begin")
     assert job_state is not None, f"Job state is None for job id: {job_id}"
     print(f"[AV] Experiment {experiment_name} - Task {job_state.name} with id: 
{job_id}")
 
diff --git a/dev-tools/batch_launch_experiments.py 
b/dev-tools/batch_launch_experiments.py
index 3449c0e0da..8edaa47a7d 100644
--- a/dev-tools/batch_launch_experiments.py
+++ b/dev-tools/batch_launch_experiments.py
@@ -12,7 +12,7 @@ from rich.console import Console
 from rich.live import Live
 from rich.panel import Panel
 from rich.layout import Layout
-from io import StringIO
+from collections import deque
 
 os.environ['AUTH_SERVER_URL'] = "https://auth.dev.cybershuttle.org";
 os.environ['API_SERVER_HOSTNAME'] = "api.dev.cybershuttle.org"
@@ -64,19 +64,86 @@ class JobResult(pydantic.BaseModel):
     error: str | None = None
 
 
+def get_experiment_state_value(status) -> tuple[int, str, ExperimentState]:
+    """Extract state value, name, and enum from status. Returns (value, name, 
enum)."""
+    if isinstance(status, ExperimentState):
+        return status.value, status.name, status
+    
+    # Handle ExperimentStatus object
+    if hasattr(status, 'state'):
+        state = status.state
+        if isinstance(state, ExperimentState):
+            return state.value, state.name, state
+        elif hasattr(state, 'value'):
+            return state.value, state.name if hasattr(state, 'name') else 
str(state), state
+    
+    # Handle direct value/name access
+    status_value = status.value if hasattr(status, 'value') else (status if 
isinstance(status, int) else None)
+    status_name = status.name if hasattr(status, 'name') else str(status)
+    
+    # Convert to ExperimentState enum
+    if status_value is not None:
+        try:
+            enum_state = ExperimentState(status_value)
+            return status_value, status_name, enum_state
+        except (ValueError, TypeError):
+            pass
+    
+    # Fallback
+    return None, status_name, ExperimentState.FAILED
+
+
 def monitor_experiment_silent(operator: AiravataOperator, experiment_id: str, 
check_interval: int = 30) -> ExperimentState:
     """Monitor experiment silently until completion. Returns final status."""
-    while True:
-        status = operator.get_experiment_status(experiment_id)
-        
-        if status in [
-            ExperimentState.COMPLETED,
-            ExperimentState.CANCELED,
-            ExperimentState.FAILED,
-        ]:
-            return status
+    logger = logging.getLogger(__name__)
+    max_checks = 3600  # Maximum number of checks (about 5 hours at 5s 
interval)
+    check_count = 0
+    
+    # Use shorter interval initially, then increase
+    initial_interval = min(check_interval, 5)  # Check every 5 seconds 
initially
+    
+    while check_count < max_checks:
+        try:
+            status = operator.get_experiment_status(experiment_id)
+            
+            # Extract state information
+            status_value, status_name, status_enum = 
get_experiment_state_value(status)
+            
+            # Log status periodically for debugging
+            if check_count % 12 == 0:  # Log every minute (12 * 5s)
+                logger.debug(f"Experiment {experiment_id} status check 
{check_count}: value={status_value}, name={status_name}")
+            
+            # Check terminal states: COMPLETED (7), CANCELED (6), FAILED (8)
+            if status_value is not None:
+                is_terminal = status_value in [
+                    ExperimentState.COMPLETED.value,  # 7
+                    ExperimentState.CANCELED.value,   # 6
+                    ExperimentState.FAILED.value      # 8
+                ]
+            else:
+                is_terminal = status_name in ['COMPLETED', 'CANCELED', 
'FAILED']
+            
+            if is_terminal:
+                logger.info(f"Experiment {experiment_id} reached terminal 
state: {status_name} (value: {status_value})")
+                return status_enum
+            
+        except Exception as e:
+            # If we can't get status, log but continue monitoring
+            logger.warning(f"Error checking experiment {experiment_id} status 
(check {check_count}): {e}")
+            import traceback
+            logger.debug(traceback.format_exc())
+            if check_count > 10:  # After several failed checks, assume failed
+                logger.error(f"Multiple status check failures for 
{experiment_id}, assuming FAILED")
+                return ExperimentState.FAILED
         
-        time.sleep(check_interval)
+        # Sleep before next check
+        sleep_time = initial_interval if check_count < 6 else check_interval
+        time.sleep(sleep_time)
+        check_count += 1
+    
+    # If we've exceeded max checks, assume failed
+    logger.error(f"Experiment {experiment_id} monitoring timeout after 
{check_count} checks, assuming FAILED")
+    return ExperimentState.FAILED
 
 
 def submit_and_monitor_job(
@@ -90,9 +157,17 @@ def submit_and_monitor_job(
         job_config = JobConfig(**job_config)
     
     try:
+        # Make experiment name unique for each job to avoid directory conflicts
+        # Using job_index ensures uniqueness and makes it easy to track
+        unique_experiment_name = f"{job_config.experiment_name}-job{job_index}"
+        
+        # Handle input_files and data_inputs same way as working version
+        input_files = job_config.input_files if job_config.input_files else 
None
+        data_inputs = job_config.data_inputs if job_config.data_inputs else 
None
+        
         result_dict = create_and_launch_experiment(
             access_token=access_token,
-            experiment_name=job_config.experiment_name,
+            experiment_name=unique_experiment_name,
             project_name=job_config.project_name,
             application_name=job_config.application_name,
             computation_resource_name=job_config.computation_resource_name,
@@ -103,15 +178,39 @@ def submit_and_monitor_job(
             group_name=job_config.group_name,
             input_storage_host=job_config.input_storage_host,
             output_storage_host=job_config.output_storage_host,
-            input_files=job_config.input_files,
-            data_inputs=job_config.data_inputs,
+            input_files=input_files,
+            data_inputs=data_inputs,
             gateway_id=job_config.gateway_id,
             auto_schedule=job_config.auto_schedule,
             monitor=False,
         )
         
         operator = AiravataOperator(access_token=access_token)
-        final_status = monitor_experiment_silent(operator, 
result_dict['experiment_id'])
+        experiment_id = result_dict['experiment_id']
+        
+        # Check status immediately after submission to catch early failures
+        try:
+            initial_status = operator.get_experiment_status(experiment_id)
+            status_value, status_name, status_enum = 
get_experiment_state_value(initial_status)
+            
+            # Check if already in terminal state
+            if status_value is not None and status_value in [
+                ExperimentState.COMPLETED.value,
+                ExperimentState.CANCELED.value,
+                ExperimentState.FAILED.value
+            ]:
+                # Already in terminal state
+                final_status = status_enum
+            else:
+                # Monitor until completion
+                final_status = monitor_experiment_silent(operator, 
experiment_id)
+        except Exception as e:
+            # If we can't check status, log and assume failed
+            logger = logging.getLogger(__name__)
+            logger.error(f"Error monitoring experiment {experiment_id}: {e}")
+            import traceback
+            logger.debug(traceback.format_exc())
+            final_status = ExperimentState.FAILED
         
         result = ExperimentLaunchResult(**result_dict)
         
@@ -123,6 +222,12 @@ def submit_and_monitor_job(
             success=final_status == ExperimentState.COMPLETED,
         )
     except Exception as e:
+        # Log the error for debugging
+        import traceback
+        error_msg = f"{str(e)}\n{traceback.format_exc()}"
+        logger = logging.getLogger(__name__)
+        logger.error(f"Job {job_index} failed: {error_msg}")
+        
         return JobResult(
             job_index=job_index,
             experiment_id=None,
@@ -146,46 +251,68 @@ def batch_submit_jobs(
     
     console = Console()
     results = []
-    log_buffer = StringIO()
+    log_buffer = deque(maxlen=50)  # Keep last 50 log lines for display
     
-    # Add handler to capture logs (without reconfiguring)
-    log_handler = logging.StreamHandler(log_buffer)
+    # Custom handler to capture logs to buffer
+    class ListHandler(logging.Handler):
+        def __init__(self, buffer):
+            super().__init__()
+            self.buffer = buffer
+        
+        def emit(self, record):
+            msg = self.format(record)
+            self.buffer.append(msg)
+    
+    log_handler = ListHandler(log_buffer)
     log_handler.setLevel(logging.INFO)
+    log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - 
%(levelname)s - %(message)s'))
+    
+    # Add to root logger and module logger
     logging.root.addHandler(log_handler)
+    logger = logging.getLogger('create_launch_experiment_with_storage')
+    logger.addHandler(log_handler)
     
-    try:
-        # Configure progress bar
-        progress = Progress(
-            SpinnerColumn(),
-            TextColumn("[progress.description]{task.description}"),
-            BarColumn(),
-            TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
-            TextColumn("•"),
-            TextColumn("{task.completed}/{task.total}"),
-            TimeElapsedColumn(),
-            console=console,
-        )
-        
-        task = progress.add_task(
-            f"Jobs: 0/{num_copies} completed, 0 running",
-            total=num_copies
-        )
-        
-        # Create layout with logs above and progress below
-        layout = Layout()
-        layout.split_column(
-            Layout(name="logs", size=None),
-            Layout(progress, name="progress", size=3)
+    # Configure progress bar
+    progress = Progress(
+        SpinnerColumn(),
+        TextColumn("[progress.description]{task.description}"),
+        BarColumn(),
+        TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
+        TextColumn("•"),
+        TextColumn("{task.completed}/{task.total}"),
+        TimeElapsedColumn(),
+        console=console,
+    )
+    
+    task = progress.add_task(
+        f"{num_copies} total, 0 running, 0 completed, 0 failed",
+        total=num_copies
+    )
+    
+    # Create layout with logs above and progress below
+    layout = Layout()
+    layout.split_column(
+        Layout(name="logs", size=None),
+        Layout(progress, name="progress", size=3)
+    )
+    
+    def make_display():
+        # Get logs from buffer - always show the latest logs (they're added to 
end of deque)
+        log_lines = list(log_buffer) if log_buffer else ["No logs yet..."]
+        # Show last 20 lines to keep display manageable and scrolled to bottom
+        display_lines = log_lines[-20:] if len(log_lines) > 20 else log_lines
+        log_text = '\n'.join(display_lines)
+        log_panel = Panel(
+            log_text, 
+            title="Logs (latest)", 
+            border_style="blue", 
+            height=None,
+            expand=False
         )
-        
-        def make_display():
-            log_content = log_buffer.getvalue()
-            log_lines = log_content.split('\n')[-20:]  # Last 20 lines
-            log_panel = Panel('\n'.join(log_lines), title="Logs", 
border_style="blue")
-            layout["logs"].update(log_panel)
-            layout["progress"].update(progress)
-            return layout
-        
+        layout["logs"].update(log_panel)
+        return layout
+    
+    try:
         # Use Live to keep layout fixed, progress at bottom
         with Live(make_display(), console=console, refresh_per_second=4, 
screen=True) as live:
             with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
@@ -199,38 +326,77 @@ def batch_submit_jobs(
                     next_job_index += 1
                 
                 # Process completed jobs and submit new ones
-                while active_futures:
+                # Continue until all jobs are submitted AND all active futures 
are done
+                while active_futures or next_job_index < num_copies:
                     completed_futures = [f for f in active_futures if f.done()]
                     
                     for future in completed_futures:
-                        results.append(future.result())
-                        active_futures.pop(future)
+                        job_idx = active_futures.pop(future)
                         
-                        # Submit next job if available
-                        if next_job_index < num_copies:
+                        try:
+                            result = future.result()
+                            results.append(result)
+                        except Exception as e:
+                            # Handle unexpected exceptions
+                            results.append(JobResult(
+                                job_index=job_idx,
+                                experiment_id=None,
+                                status='ERROR',
+                                result=None,
+                                success=False,
+                                error=str(e),
+                            ))
+                    
+                    # Submit next jobs if available and we have capacity
+                    while next_job_index < num_copies and len(active_futures) 
< max_concurrent:
+                        try:
                             new_future = 
executor.submit(submit_and_monitor_job, next_job_index, job_config, 
access_token)
                             active_futures[new_future] = next_job_index
                             next_job_index += 1
+                        except Exception as e:
+                            # If submission itself fails, mark as error and 
continue
+                            results.append(JobResult(
+                                job_index=next_job_index,
+                                experiment_id=None,
+                                status='ERROR',
+                                result=None,
+                                success=False,
+                                error=f"Submission failed: {str(e)}",
+                            ))
+                            next_job_index += 1
                     
-                    # Update progress bar and refresh display
+                    # Update progress bar with counts
                     completed_count = len(results)
                     running_count = len(active_futures)
+                    submitted_count = next_job_index
+                    successful_count = sum(1 for r in results if r.success)
+                    failed_count = completed_count - successful_count
+                    
+                    # Show submitted count if not all jobs submitted yet
+                    if submitted_count < num_copies:
+                        status_desc = f"{num_copies} total, {submitted_count} 
submitted, {running_count} running, {completed_count} completed, {failed_count} 
failed"
+                    else:
+                        status_desc = f"{num_copies} total, {running_count} 
running, {completed_count} completed, {failed_count} failed"
+                    
                     progress.update(
                         task,
                         completed=completed_count,
-                        description=f"Jobs: {completed_count}/{num_copies} 
completed, {running_count} running"
+                        description=status_desc
                     )
                     live.update(make_display())
                     
-                    if not completed_futures:
+                    if not completed_futures and next_job_index >= num_copies:
+                        # Only sleep if nothing changed
                         time.sleep(1)
-        
+    
         # Sort results by job_index
         results.sort(key=lambda x: x.job_index)
         return results
     finally:
-        # Clean up log handler
+        # Clean up log handlers
         logging.root.removeHandler(log_handler)
+        if log_handler in logger.handlers:
+            logger.removeHandler(log_handler)
 
 
 def main():
@@ -239,23 +405,39 @@ def main():
     
     access_token = AuthContext.get_access_token()
     
-    # Job configuration
+    # Job configuration - matching create_launch_experiment_with_storage.py 
exactly
+    EXPERIMENT_NAME = "Test"
+    PROJECT_NAME = "Default Project"
+    APPLICATION_NAME = "NAMD-test"
+    GATEWAY_ID = None
+    COMPUTATION_RESOURCE_NAME = "NeuroData25VC2"
+    QUEUE_NAME = "cloud"
+    NODE_COUNT = 1
+    CPU_COUNT = 1
+    WALLTIME = 5
+    GROUP_NAME = "Default"
+    INPUT_STORAGE_HOST = "gateway.dev.cybershuttle.org"
+    OUTPUT_STORAGE_HOST = "149.165.169.12"
+    INPUT_FILES = {}
+    DATA_INPUTS = {}
+    AUTO_SCHEDULE = False
+    
     job_config = JobConfig(
-        experiment_name='Test',
-        project_name='Default Project',
-        application_name='NAMD-test',
-        computation_resource_name='NeuroData25VC2',
-        queue_name='cloud',
-        node_count=1,
-        cpu_count=1,
-        walltime=5,
-        group_name='Default',
-        input_storage_host='gateway.dev.cybershuttle.org',
-        output_storage_host='149.165.169.12',
-        input_files=None,
-        data_inputs=None,
-        gateway_id=None,
-        auto_schedule=False,
+        experiment_name=EXPERIMENT_NAME,
+        project_name=PROJECT_NAME,
+        application_name=APPLICATION_NAME,
+        computation_resource_name=COMPUTATION_RESOURCE_NAME,
+        queue_name=QUEUE_NAME,
+        node_count=NODE_COUNT,
+        cpu_count=CPU_COUNT,
+        walltime=WALLTIME,
+        group_name=GROUP_NAME,
+        input_storage_host=INPUT_STORAGE_HOST,
+        output_storage_host=OUTPUT_STORAGE_HOST,
+        input_files=INPUT_FILES if INPUT_FILES else None,
+        data_inputs=DATA_INPUTS if DATA_INPUTS else None,
+        gateway_id=GATEWAY_ID,
+        auto_schedule=AUTO_SCHEDULE,
     )
     
     num_copies = 10
diff --git a/dev-tools/create_launch_experiment_with_storage.py 
b/dev-tools/create_launch_experiment_with_storage.py
index 95f66a957a..7d333e1234 100755
--- a/dev-tools/create_launch_experiment_with_storage.py
+++ b/dev-tools/create_launch_experiment_with_storage.py
@@ -141,20 +141,62 @@ def create_and_launch_experiment(
     return result
 
 
+def get_experiment_state_value(status) -> tuple[int, str, ExperimentState]:
+    """Extract state value, name, and enum from status. Returns (value, name, 
enum)."""
+    if isinstance(status, ExperimentState):
+        return status.value, status.name, status
+    
+    # Handle ExperimentStatus object
+    if hasattr(status, 'state'):
+        state = status.state
+        if isinstance(state, ExperimentState):
+            return state.value, state.name, state
+        elif hasattr(state, 'value'):
+            return state.value, state.name if hasattr(state, 'name') else 
str(state), state
+    
+    # Handle direct value/name access
+    status_value = status.value if hasattr(status, 'value') else (status if 
isinstance(status, int) else None)
+    status_name = status.name if hasattr(status, 'name') else str(status)
+    
+    # Convert to ExperimentState enum
+    if status_value is not None:
+        try:
+            enum_state = ExperimentState(status_value)
+            return status_value, status_name, enum_state
+        except (ValueError, TypeError):
+            pass
+    
+    # Fallback
+    return None, status_name, ExperimentState.FAILED
+
+
 def monitor_experiment(operator: AiravataOperator, experiment_id: str, 
check_interval: int = 30):
     logger.info(f"Monitoring experiment {experiment_id}...")
     
     while True:
-        status = operator.get_experiment_status(experiment_id)
-        logger.info(f"Experiment status: {status.name}")
-        
-        if status in [
-            ExperimentState.COMPLETED,
-            ExperimentState.CANCELED,
-            ExperimentState.FAILED,
-        ]:
-            logger.info(f"Experiment finished with state: {status.name}")
-            break
+        try:
+            status = operator.get_experiment_status(experiment_id)
+            status_value, status_name, status_enum = 
get_experiment_state_value(status)
+            logger.info(f"Experiment status: {status_name} (value: 
{status_value})")
+            
+            # Check terminal states: COMPLETED (7), CANCELED (6), FAILED (8)
+            if status_value is not None:
+                is_terminal = status_value in [
+                    ExperimentState.COMPLETED.value,  # 7
+                    ExperimentState.CANCELED.value,   # 6
+                    ExperimentState.FAILED.value      # 8
+                ]
+            else:
+                is_terminal = status_name in ['COMPLETED', 'CANCELED', 
'FAILED']
+            
+            if is_terminal:
+                logger.info(f"Experiment finished with state: {status_name}")
+                break
+        except Exception as e:
+            logger.error(f"Error checking experiment {experiment_id} status: 
{e}")
+            import traceback
+            logger.debug(traceback.format_exc())
+            # Continue monitoring despite errors
         
         time.sleep(check_interval)
 

Reply via email to