GitHub user Muthukamalan closed a discussion: Selenium in Airflow

Hi Everyone,


I tries to create a simple dag, and following task
- **task1**:: create selenium (chrome) driver  
- **task2**:: navigate to other pages     

but I facing issue in pushing and pull xcoms.

> [!CAUTION]
>  Error creating driver: Can't pickle local object 
> '_createenviron.<locals>.encode'

[as per 
stackoverflow](https://stackoverflow.com/questions/73775739/how-to-solve-this-problem-could-not-serialize-the-xcom-value-into-json)
 👇 I modify
```toml
xcom_backend=airflow.models.xcom.BaseXCom
enable_xcom_pickling = True
```

```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import os

# Constants
DOWNLOAD_PATH = os.path.expanduser('~/Downloads')  # Adjust as needed
AMAZON_URL = "https://www.amazon.com";

def create_chrome_driver(**context):
    """Create and configure Chrome WebDriver"""
    print(context)
    try:
        chrome_options = Options()
        chrome_options.add_argument('--no-sandbox')
        # chrome_options.add_argument('--headless')  # Run in headless mode
        chrome_options.add_argument('--disable-dev-shm-usage')
        
        # Configure download settings
        prefs = {
            "download.default_directory": DOWNLOAD_PATH,
            "download.prompt_for_download": False,
            "download.directory_upgrade": True,
            "safebrowsing.enabled": True
        }
        chrome_options.add_experimental_option("prefs", prefs)
        
        # Initialize driver
        # service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome( options=chrome_options)  #service=service,
        driver.implicitly_wait(10)
        
        # Store driver in XCom for next task
        context['task_instance'].xcom_push(key='webdriver', value=driver)
        return "Driver created successfully"
        
    except Exception as e:
        print(f"Error creating driver: {str(e)}")
        raise

def navigate_to_amazon(**context):
    """Navigate to Amazon using the created driver"""
    try:
        # Get driver from previous task
        driver = context['task_instance'].xcom_pull(task_ids='create_driver', 
key='webdriver')
        print("driver init successfully")
        # Navigate to Amazon
        driver.get(AMAZON_URL)
        
        # Verify we're on Amazon (basic check)
        assert "Amazon" in driver.title, "Failed to navigate to Amazon"
        
        return f"Successfully navigated to {AMAZON_URL}"
        
    except Exception as e:
        print(f"Error navigating to Amazon: {str(e)}")
        raise
    finally:
        if driver:
            driver.quit()

# DAG definition
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'amazon_driver_test',
    default_args=default_args,
    description='Test DAG for creating WebDriver and navigating to Amazon',
    schedule_interval=None,  # Manual trigger only
    catchup=False
) as dag:

    create_driver = PythonOperator(
        task_id='create_driver',
        python_callable=create_chrome_driver,
        provide_context=True,
    )

    goto_amazon = PythonOperator(
        task_id='navigate_to_amazon',
        python_callable=navigate_to_amazon,
        provide_context=True,
    )

    # Set task dependencies
    create_driver >> goto_amazon

```

I tried to achieve following

![selenium in 
airflow](https://github.com/user-attachments/assets/18dba12f-9b11-4e0f-8b26-2f69aeea9e87)

GitHub link: https://github.com/apache/airflow/discussions/43574

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to