pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848059666
##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
from airflow.providers.google.cloud.sensors.gcs import (
GCSObjectExistenceSensor,
GCSObjectsWithPrefixExistenceSensor,
+ GCSObjectUpdateSensor,
+ GCSUploadSessionCompleteSensor,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import
GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import
GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
START_DATE = datetime(2021, 1, 1)
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
GCS_ACL_BUCKET_ROLE = "OWNER"
GCS_ACL_OBJECT_ROLE = "OWNER"
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
"GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path,
"transform_script.py")
)
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
"GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path,
"test-gcs-example-upload.txt")
)
-PATH_TO_UPLOAD_FILE_PREFIX =
os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX",
"test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
"GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path,
"test-gcs-example-download.txt")
)
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after
triggering the DAG.
Review Comment:
Hi @bhirsz I wanted to do upload the file programatically like the other
chain. However, the sensor task GCSUploadSessionCompleteSensor waits for change
in number of files in the bucket. If we have the upload task before
GCSUploadSessionCompleteSensor task, it won't detect any changes. On the other
hand, if we add upload task after GCSUploadSessionCompleteSensor task, it would
be blocked until GCSUploadSessionCompleteSensor task completes and does not
solve the need. I am unsure how to add dependency between such tasks. Any
suggestions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]