Copilot commented on code in PR #60164:
URL: https://github.com/apache/airflow/pull/60164#discussion_r2665033131
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -1741,6 +1742,13 @@ def _sync_to_local_dir_delete_stale_local_files(self,
current_s3_objects: list[P
self.log.error("Error deleting stale item %s: %s", item, e)
raise e
+ def _compute_local_file_md5(self, file_path: Path) -> str:
+ hash_md5 = hashlib.md5(usedforsecurity=False)
+ with open(file_path, "rb") as f:
+ for chunk in iter(lambda: f.read(8192), b""):
+ hash_md5.update(chunk)
Review Comment:
The MD5 computation doesn't handle potential file I/O errors. If the file is
deleted, locked, or becomes inaccessible between the `exists()` check and this
function call, an unhandled exception will occur. Consider adding error
handling for FileNotFoundError, PermissionError, or other OSError exceptions
that could occur during file reading.
```suggestion
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
hash_md5.update(chunk)
except (FileNotFoundError, PermissionError, OSError) as e:
self.log.error("Failed to compute MD5 for local file %s: %s",
file_path, e)
raise AirflowException(f"Failed to compute MD5 for local file
{file_path}: {e}") from e
```
##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:
##########
@@ -1755,11 +1763,33 @@ def _sync_to_local_dir_if_changed(self, s3_bucket,
s3_object, local_target_path:
download_msg = (
f"S3 object size ({s3_object.size}) and local file size
({local_stats.st_size}) differ."
)
-
- s3_last_modified = s3_object.last_modified
- if local_stats.st_mtime < s3_last_modified.microsecond:
- should_download = True
- download_msg = f"S3 object last modified
({s3_last_modified.microsecond}) and local file last modified
({local_stats.st_mtime}) differ."
+ else:
+ s3_etag = s3_object.e_tag
+ if s3_etag:
+ if "-" not in s3_etag:
+ local_md5 =
self._compute_local_file_md5(local_target_path)
+ if local_md5 != s3_etag:
+ should_download = True
+ download_msg = (
+ f"S3 object ETag ({s3_etag}) and local file
MD5 ({local_md5}) differ "
+ f"(content changed while size remained the
same)."
+ )
+ else:
+ s3_last_modified = s3_object.last_modified
+ if s3_last_modified and local_stats.st_mtime <
s3_last_modified.timestamp():
+ should_download = True
+ download_msg = (
+ f"S3 object last modified ({s3_last_modified})
is newer than "
+ f"local file last modified
({datetime.fromtimestamp(local_stats.st_mtime)})."
+ )
+ else:
+ s3_last_modified = s3_object.last_modified
+ if s3_last_modified and local_stats.st_mtime <
s3_last_modified.timestamp():
+ should_download = True
+ download_msg = (
+ f"S3 object last modified ({s3_last_modified}) is
newer than "
+ f"local file last modified
({datetime.fromtimestamp(local_stats.st_mtime)})."
+ )
Review Comment:
The code has duplicate logic for timestamp comparison in two different
branches (lines 1778-1784 and 1786-1792). This duplicated code reduces
maintainability. Consider extracting the timestamp comparison logic into a
helper method or consolidating the conditionals to avoid repetition.
```suggestion
s3_last_modified = s3_object.last_modified
if s3_etag and "-" not in s3_etag:
local_md5 =
self._compute_local_file_md5(local_target_path)
if local_md5 != s3_etag:
should_download = True
download_msg = (
f"S3 object ETag ({s3_etag}) and local file MD5
({local_md5}) differ "
f"(content changed while size remained the
same)."
)
elif s3_last_modified and local_stats.st_mtime <
s3_last_modified.timestamp():
should_download = True
download_msg = (
f"S3 object last modified ({s3_last_modified}) is
newer than "
f"local file last modified
({datetime.fromtimestamp(local_stats.st_mtime)})."
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]