simi opened a new pull request, #62013:
URL: https://github.com/apache/airflow/pull/62013

   Previously, `get_fs()` extracted a static access token string via 
`_get_access_token()` and passed it to `GCSFileSystem`. This token expires 
after ~1 hour with no way to refresh, causing 401 errors in long-running tasks.
   
   By passing the `Credentials` object directly, `gcsfs` can automatically 
refresh the token before each request via its built-in `maybe_refresh()` 
mechanism.
   
   This was introduced in #38102 to fix #37834 where `credentials.token` was 
`None` before manual refresh.
   
   discussed on Slack few week ago with @shahar1 @potiuk 
   
   ---
   
   I have added some mocked basic tests, but not sure how much value they 
brings. I have tested using following DAG locally also. Before it fails - after 
it refresh tokens and continue.
   
   ```python
   import datetime
   
   from airflow.decorators import dag, task
   from airflow.io.path import ObjectStoragePath
   
   GCS_CONN_ID = "gcs_test"
   # Change this to your bucket
   BUCKET = "gs://example-bucket"
   
   base = ObjectStoragePath(BUCKET, conn_id=GCS_CONN_ID)
   
   
   @dag(
       schedule=None,
       start_date=datetime.datetime(2024, 1, 1),
       catchup=False,
       tags=["test", "gcs"],
   )
   def test_gcs_token_refresh():
       @task
       def write_and_mangle_and_write_again():
           import datetime as dt
   
           path1 = base / "token_test_1.txt"
           path2 = base / "token_test_2.txt"
   
           # First write - should work with fresh token
           print("1. Writing first file...")
           with path1.open("w") as f:
               f.write("first write - fresh token")
           print(f"   OK: {path1}")
   
           # Mangle the token to simulate expiry
           # Access the underlying gcsfs filesystem and its credentials
           fs = path1.fs
           creds = fs.credentials.credentials
           print(f"\n2. Mangling token to simulate expiry...")
           print(f"   Token before: {creds.token[:30]}...")
           creds.token = "expired_garbage_token"
           creds.expiry = dt.datetime.utcnow() - dt.timedelta(hours=1)
           print(f"   Token after:  {creds.token}")
           print(f"   Expired:      {creds.expired}")
   
           # Second write - with our fix, gcsfs should auto-refresh
           # With the old code (_get_access_token), this would fail with 401
           print(f"\n3. Writing second file (should auto-refresh)...")
           with path2.open("w") as f:
               f.write("second write - after token refresh")
           print(f"   OK: {path2}")
           print(f"   Token now: {creds.token[:30]}...")
   
           # Cleanup
           path1.unlink()
           path2.unlink()
           print("\n4. Cleanup done. Token refresh works!")
   
       write_and_mangle_and_write_again()
   
   
   test_gcs_token_refresh()
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to