pankajkoti commented on code in PR #22808:
URL: https://github.com/apache/airflow/pull/22808#discussion_r848059666


##########
airflow/providers/google/cloud/example_dags/example_gcs.py:
##########
@@ -37,35 +37,44 @@
 from airflow.providers.google.cloud.sensors.gcs import (
     GCSObjectExistenceSensor,
     GCSObjectsWithPrefixExistenceSensor,
+    GCSObjectUpdateSensor,
+    GCSUploadSessionCompleteSensor,
 )
 from airflow.providers.google.cloud.transfers.gcs_to_gcs import 
GCSToGCSOperator
 from airflow.providers.google.cloud.transfers.gcs_to_local import 
GCSToLocalFilesystemOperator
 from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
 
 START_DATE = datetime(2021, 1, 1)
 
-PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-id")
-BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
-GCS_ACL_ENTITY = os.environ.get("GCS_ACL_ENTITY", "allUsers")
+PROJECT_ID = os.getenv("GCP_PROJECT_ID", "example-id")
+BUCKET_1 = os.getenv("GCP_GCS_BUCKET_1", "test-gcs-example-bucket")
+GCS_ACL_ENTITY = os.getenv("GCS_ACL_ENTITY", "allUsers")
 GCS_ACL_BUCKET_ROLE = "OWNER"
 GCS_ACL_OBJECT_ROLE = "OWNER"
 
-BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
+BUCKET_2 = os.getenv("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2")
 
 temp_dir_path = gettempdir()
-PATH_TO_TRANSFORM_SCRIPT = os.environ.get(
+PATH_TO_TRANSFORM_SCRIPT = os.getenv(
     "GCP_GCS_PATH_TO_TRANSFORM_SCRIPT", os.path.join(temp_dir_path, 
"transform_script.py")
 )
-PATH_TO_UPLOAD_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE = os.getenv(
     "GCP_GCS_PATH_TO_UPLOAD_FILE", os.path.join(temp_dir_path, 
"test-gcs-example-upload.txt")
 )
-PATH_TO_UPLOAD_FILE_PREFIX = 
os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-")
-PATH_TO_SAVED_FILE = os.environ.get(
+PATH_TO_UPLOAD_FILE_PREFIX = os.getenv("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", 
"test-gcs-")
+PATH_TO_SAVED_FILE = os.getenv(
     "GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, 
"test-gcs-example-download.txt")
 )
 
 BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1]
 
+# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after 
triggering the DAG.

Review Comment:
   Hi @bhirsz I wanted to do upload the file programatically like the other 
chain. However, the sensor task GCSUploadSessionCompleteSensor waits for change 
in number of files in the bucket. If we have the task before 
GCSUploadSessionCompleteSensor task, it won't detect any changes. On the other 
hand, if we add upload task after GCSUploadSessionCompleteSensor task, it would 
be blocked until GCSUploadSessionCompleteSensor task completes and does not 
solve the need. I am unsure how to add dependency between such tasks. Any 
suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to