simi opened a new pull request, #62013:
URL: https://github.com/apache/airflow/pull/62013
Previously, `get_fs()` extracted a static access token string via
`_get_access_token()` and passed it to `GCSFileSystem`. This token expires
after ~1 hour with no way to refresh, causing 401 errors in long-running tasks.
By passing the `Credentials` object directly, `gcsfs` can automatically
refresh the token before each request via its built-in `maybe_refresh()`
mechanism.
This was introduced in #38102 to fix #37834 where `credentials.token` was
`None` before manual refresh.
discussed on Slack few week ago with @shahar1 @potiuk
---
I have added some mocked basic tests, but not sure how much value they
brings. I have tested using following DAG locally also. Before it fails - after
it refresh tokens and continue.
```python
import datetime
from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath
GCS_CONN_ID = "gcs_test"
# Change this to your bucket
BUCKET = "gs://example-bucket"
base = ObjectStoragePath(BUCKET, conn_id=GCS_CONN_ID)
@dag(
schedule=None,
start_date=datetime.datetime(2024, 1, 1),
catchup=False,
tags=["test", "gcs"],
)
def test_gcs_token_refresh():
@task
def write_and_mangle_and_write_again():
import datetime as dt
path1 = base / "token_test_1.txt"
path2 = base / "token_test_2.txt"
# First write - should work with fresh token
print("1. Writing first file...")
with path1.open("w") as f:
f.write("first write - fresh token")
print(f" OK: {path1}")
# Mangle the token to simulate expiry
# Access the underlying gcsfs filesystem and its credentials
fs = path1.fs
creds = fs.credentials.credentials
print(f"\n2. Mangling token to simulate expiry...")
print(f" Token before: {creds.token[:30]}...")
creds.token = "expired_garbage_token"
creds.expiry = dt.datetime.utcnow() - dt.timedelta(hours=1)
print(f" Token after: {creds.token}")
print(f" Expired: {creds.expired}")
# Second write - with our fix, gcsfs should auto-refresh
# With the old code (_get_access_token), this would fail with 401
print(f"\n3. Writing second file (should auto-refresh)...")
with path2.open("w") as f:
f.write("second write - after token refresh")
print(f" OK: {path2}")
print(f" Token now: {creds.token[:30]}...")
# Cleanup
path1.unlink()
path2.unlink()
print("\n4. Cleanup done. Token refresh works!")
write_and_mangle_and_write_again()
test_gcs_token_refresh()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]