This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4869575b2c Refactor os.path.splitext to Path.* (#34352)
4869575b2c is described below
commit 4869575b2c538b54cbc9368791a924f7cd5f7ce8
Author: Miroslav Šedivý <[email protected]>
AuthorDate: Fri Sep 15 05:57:05 2023 +0000
Refactor os.path.splitext to Path.* (#34352)
---
airflow/cli/commands/internal_api_command.py | 7 +++++--
airflow/cli/commands/webserver_command.py | 7 +++++--
airflow/dag_processing/manager.py | 7 ++-----
airflow/models/dagbag.py | 21 +++++++++------------
docs/exts/docs_build/dev_index_generator.py | 22 +++++++---------------
5 files changed, 28 insertions(+), 36 deletions(-)
diff --git a/airflow/cli/commands/internal_api_command.py
b/airflow/cli/commands/internal_api_command.py
index 7b2cf798da..73ed2e2501 100644
--- a/airflow/cli/commands/internal_api_command.py
+++ b/airflow/cli/commands/internal_api_command.py
@@ -24,6 +24,7 @@ import subprocess
import sys
import textwrap
from contextlib import suppress
+from pathlib import Path
from tempfile import gettempdir
from time import sleep
@@ -170,13 +171,15 @@ def internal_api(args):
handle = setup_logging(log_file)
- base, ext = os.path.splitext(pid_file)
+ pid_path = Path(pid_file)
+ pidlock_path =
pid_path.with_name(f"{pid_path.stem}-monitor{pid_path.suffix}")
+
with open(stdout, "a") as stdout, open(stderr, "a") as stderr:
stdout.truncate(0)
stderr.truncate(0)
ctx = daemon.DaemonContext(
- pidfile=TimeoutPIDLockFile(f"{base}-monitor{ext}", -1),
+ pidfile=TimeoutPIDLockFile(pidlock_path, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
diff --git a/airflow/cli/commands/webserver_command.py
b/airflow/cli/commands/webserver_command.py
index 79694db59d..beae141a6b 100644
--- a/airflow/cli/commands/webserver_command.py
+++ b/airflow/cli/commands/webserver_command.py
@@ -25,6 +25,7 @@ import sys
import textwrap
import time
from contextlib import suppress
+from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, NoReturn
@@ -476,13 +477,15 @@ def webserver(args):
handle = setup_logging(log_file)
- base, ext = os.path.splitext(pid_file)
+ pid_path = Path(pid_file)
+ pidlock_path =
pid_path.with_name(f"{pid_path.stem}-monitor{pid_path.suffix}")
+
with open(stdout, "a") as stdout, open(stderr, "a") as stderr:
stdout.truncate(0)
stderr.truncate(0)
ctx = daemon.DaemonContext(
- pidfile=TimeoutPIDLockFile(f"{base}-monitor{ext}", -1),
+ pidfile=TimeoutPIDLockFile(pidlock_path, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 9b76bb9374..ab93a21026 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -850,9 +850,7 @@ class DagFileProcessorManager(LoggingMixin):
last_runtime = self.get_last_runtime(file_path)
num_dags = self.get_last_dag_count(file_path)
num_errors = self.get_last_error_count(file_path)
- file_name = os.path.basename(file_path)
- file_name = os.path.splitext(file_name)[0].replace(os.sep, ".")
-
+ file_name = Path(file_path).stem
processor_pid = self.get_pid(file_path)
processor_start_time = self.get_start_time(file_path)
runtime = (now - processor_start_time) if processor_start_time
else None
@@ -1042,8 +1040,7 @@ class DagFileProcessorManager(LoggingMixin):
run_count=self.get_run_count(processor.file_path) + 1,
)
self._file_stats[processor.file_path] = stat
-
- file_name =
os.path.splitext(os.path.basename(processor.file_path))[0].replace(os.sep, ".")
+ file_name = Path(processor.file_path).stem
Stats.timing(f"dag_processing.last_duration.{file_name}",
last_duration)
Stats.timing("dag_processing.last_duration", last_duration,
tags={"file_name": file_name})
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index b603cf08be..4e2efeefde 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -28,6 +28,7 @@ import traceback
import warnings
import zipfile
from datetime import datetime, timedelta
+from pathlib import Path
from typing import TYPE_CHECKING, NamedTuple
from sqlalchemy.exc import OperationalError
@@ -55,8 +56,6 @@ from airflow.utils.timeout import timeout
from airflow.utils.types import NOTSET
if TYPE_CHECKING:
- import pathlib
-
from sqlalchemy.orm import Session
from airflow.models.dag import DAG
@@ -95,7 +94,7 @@ class DagBag(LoggingMixin):
def __init__(
self,
- dag_folder: str | pathlib.Path | None = None,
+ dag_folder: str | Path | None = None,
include_examples: bool | ArgNotSet = NOTSET,
safe_mode: bool | ArgNotSet = NOTSET,
read_dags_from_db: bool = False,
@@ -327,8 +326,8 @@ class DagBag(LoggingMixin):
return []
self.log.debug("Importing %s", filepath)
- org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
path_hash = hashlib.sha1(filepath.encode("utf-8")).hexdigest()
+ org_mod_name = Path(filepath).stem
mod_name = f"unusual_prefix_{path_hash}_{org_mod_name}"
if mod_name in sys.modules:
@@ -380,15 +379,12 @@ class DagBag(LoggingMixin):
mods = []
with zipfile.ZipFile(filepath) as current_zip_file:
for zip_info in current_zip_file.infolist():
- head, _ = os.path.split(zip_info.filename)
- mod_name, ext = os.path.splitext(zip_info.filename)
- if ext not in [".py", ".pyc"]:
- continue
- if head:
+ zip_path = Path(zip_info.filename)
+ if zip_path.suffix not in [".py", ".pyc"] or
len(zip_path.parts) > 1:
continue
- if mod_name == "__init__":
- self.log.warning("Found __init__.%s at root of %s", ext,
filepath)
+ if zip_path.stem == "__init__":
+ self.log.warning("Found %s at root of %s", zip_path.name,
filepath)
self.log.debug("Reading %s from %s", zip_info.filename,
filepath)
@@ -402,6 +398,7 @@ class DagBag(LoggingMixin):
)
continue
+ mod_name = zip_path.stem
if mod_name in sys.modules:
del sys.modules[mod_name]
@@ -518,7 +515,7 @@ class DagBag(LoggingMixin):
def collect_dags(
self,
- dag_folder: str | pathlib.Path | None = None,
+ dag_folder: str | Path | None = None,
only_if_updated: bool = True,
include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
diff --git a/docs/exts/docs_build/dev_index_generator.py
b/docs/exts/docs_build/dev_index_generator.py
index 055384e764..0b9e9072ab 100644
--- a/docs/exts/docs_build/dev_index_generator.py
+++ b/docs/exts/docs_build/dev_index_generator.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import argparse
import os
import sys
-from glob import glob
+from pathlib import Path
import jinja2
@@ -45,24 +45,16 @@ def _render_template(template_name, **kwargs):
def _render_content():
- provider_packages = [
- os.path.basename(os.path.dirname(p)) for p in
glob(f"{BUILD_DIR}/docs/apache-airflow-providers-*/")
- ]
providers = []
- for package_name in provider_packages:
+ provider_yamls = {p["package-name"]: p for p in ALL_PROVIDER_YAMLS}
+ for path in
sorted(Path(BUILD_DIR).glob("docs/apache-airflow-providers-*/")):
+ package_name = path.name
try:
- current_provider = next(
- provider_yaml
- for provider_yaml in ALL_PROVIDER_YAMLS
- if provider_yaml["package-name"] == package_name
- )
- providers.append(current_provider)
- except StopIteration:
+ providers.append(provider_yamls[package_name])
+ except KeyError:
print(f"WARNING! Could not find provider.yaml file for package:
{package_name}")
- content = _render_template(
- "dev_index_template.html.jinja2", providers=sorted(providers,
key=lambda k: k["package-name"])
- )
+ content = _render_template("dev_index_template.html.jinja2",
providers=providers)
return content