This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 09fef3c3ee Update http to s3 system test (#35711)
09fef3c3ee is described below

commit 09fef3c3ee1d8902cd80a9e94609d7639d5b0402
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Nov 17 13:31:25 2023 -0800

    Update http to s3 system test (#35711)
    
    Previously the system test was not functional because it did not start
    or connect to a real http test to copy the file from, which does not
    work for a system level test. This commit starts a local simple http
    server to copy a file from the local FS.
---
 .../providers/amazon/aws/transfers/http_to_s3.py   |  2 +-
 .../providers/amazon/aws/example_http_to_s3.py     | 49 +++++++++++++++++++++-
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/http_to_s3.py 
b/airflow/providers/amazon/aws/transfers/http_to_s3.py
index ba6e63eadf..a401d36231 100644
--- a/airflow/providers/amazon/aws/transfers/http_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/http_to_s3.py
@@ -89,7 +89,7 @@ class HttpToS3Operator(BaseOperator):
                  CA cert bundle than the one used by botocore.
     """
 
-    template_fields: Sequence[str] = ("endpoint", "data", "headers", 
"s3_bucket", "s3_key")
+    template_fields: Sequence[str] = ("http_conn_id", "endpoint", "data", 
"headers", "s3_bucket", "s3_key")
     template_fields_renderers = {"headers": "json", "data": "py"}
     template_ext: Sequence[str] = ()
     ui_color = "#f4a460"
diff --git a/tests/system/providers/amazon/aws/example_http_to_s3.py 
b/tests/system/providers/amazon/aws/example_http_to_s3.py
index b5d65abe69..3654140b4a 100644
--- a/tests/system/providers/amazon/aws/example_http_to_s3.py
+++ b/tests/system/providers/amazon/aws/example_http_to_s3.py
@@ -18,8 +18,12 @@ from __future__ import annotations
 
 from datetime import datetime
 
+from airflow import settings
+from airflow.decorators import task
 from airflow.models.baseoperator import chain
+from airflow.models.connection import Connection
 from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
 from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, 
S3DeleteBucketOperator
 from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator
 from airflow.utils.trigger_rule import TriggerRule
@@ -29,6 +33,34 @@ sys_test_context_task = SystemTestContextBuilder().build()
 
 DAG_ID = "example_http_to_s3"
 
+cmd = """
+#!/bin/bash
+
+echo 'foo' > /tmp/test_file
+
+cd /tmp
+
+nohup python3 -m http.server 8083 > /dev/null 2>&1 &
+
+echo $!
+sleep 2
+exit 0
+"""
+
+
+@task
+def create_connection(conn_id_name: str):
+    conn = Connection(
+        conn_id=conn_id_name,
+        conn_type="http",
+        host="localhost",
+        port=8083,
+    )
+    session = settings.Session()
+    session.add(conn)
+    session.commit()
+
+
 with DAG(
     DAG_ID,
     schedule="@once",
@@ -39,21 +71,33 @@ with DAG(
     test_context = sys_test_context_task()
     env_id = test_context["ENV_ID"]
 
+    conn_id_name = f"{env_id}-conn-id"
     s3_bucket = f"{env_id}-http-to-s3-bucket"
     s3_key = f"{env_id}-http-to-s3-key"
 
     create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", 
bucket_name=s3_bucket)
 
+    set_up_connection = create_connection(conn_id_name)
+
+    start_server = BashOperator(bash_command=cmd, task_id="start_server")
+
     # [START howto_transfer_http_to_s3]
     http_to_s3_task = HttpToS3Operator(
         task_id="http_to_s3_task",
-        endpoint="/tmp/http_path",
+        http_conn_id=conn_id_name,
+        endpoint="/test_file",
         s3_bucket=s3_bucket,
         s3_key=s3_key,
         replace=True,
     )
     # [END howto_transfer_http_to_s3]
 
+    stop_server = BashOperator(
+        task_id="stop_simple_http_server",
+        bash_command='kill {{ti.xcom_pull(task_ids="start_server")}}',
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
     delete_s3_bucket = S3DeleteBucketOperator(
         task_id="delete_s3_bucket",
         bucket_name=s3_bucket,
@@ -65,9 +109,12 @@ with DAG(
         # TEST SETUP
         test_context,
         create_s3_bucket,
+        set_up_connection,
+        start_server,
         # TEST BODY
         http_to_s3_task,
         # TEST TEARDOWN
+        stop_server,
         delete_s3_bucket,
     )
 

Reply via email to