This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a67a7033b2 Put AIP-44 internal API behind feature flag (#30510)
a67a7033b2 is described below
commit a67a7033b218345885b9b14f5b42067554b7b5ab
Author: Jarek Potiuk <[email protected]>
AuthorDate: Fri Apr 7 09:15:35 2023 +0200
Put AIP-44 internal API behind feature flag (#30510)
This includes:
* configurable setting with defaults taken from env variable
* raising exception if config variables are used with feature
flag not enabled
* hiding config values (adding mechanism to hide config values
that are set for the future versions)
* skipping tests
---
.github/workflows/ci.yml | 50 ++++++++++++++++++++-
.pre-commit-config.yaml | 2 +-
airflow/api_internal/internal_api_call.py | 5 ++-
airflow/cli/cli_config.py | 51 ++++++++++++----------
airflow/config_templates/config.yml | 8 ++--
airflow/config_templates/default_airflow.cfg | 10 -----
airflow/settings.py | 8 ++++
airflow/www/app.py | 3 ++
docs/conf.py | 15 ++++++-
scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py | 14 ++++++
.../endpoints/test_rpc_api_endpoint.py | 2 +
tests/api_internal/test_internal_api_call.py | 3 ++
tests/cli/commands/test_internal_api_command.py | 2 +
13 files changed, 132 insertions(+), 41 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e3c688b59f..f7f19fae29 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -45,6 +45,7 @@ env:
IMAGE_TAG: "${{ github.event.pull_request.head.sha || github.sha }}"
USE_SUDO: "true"
INCLUDE_SUCCESS_OUTPUTS: "true"
+ AIRFLOW_ENABLE_AIP_44: "true"
concurrency:
group: ci-${{ github.event.pull_request.number || github.ref }}
@@ -937,7 +938,54 @@ jobs:
BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
UPGRADE_BOTO: "true"
JOB_ID: >
- postgres-${{needs.build-info.outputs.default-python-version}}-
+ postgres-boto-${{needs.build-info.outputs.default-python-version}}-
+ ${{needs.build-info.outputs.default-postgres-version}}
+ COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
+ if: needs.build-info.outputs.run-tests == 'true' &&
needs.build-info.outputs.run-amazon-tests == 'true'
+ steps:
+ - name: Cleanup repo
+ shell: bash
+ run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm
-rf /workspace/*"
+ - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+ uses: actions/checkout@v3
+ with:
+ persist-credentials: false
+ - name: >
+ Prepare breeze & CI image:
${{needs.build-info.outputs.default-python-version}}:${{env.IMAGE_TAG}}
+ uses: ./.github/actions/prepare_breeze_and_image
+ - name: >
+ Tests: ${{needs.build-info.outputs.default-python-version}}:
+ ${{needs.build-info.outputs.parallel-test-types}}
+ run: breeze testing tests --run-in-parallel
+ - name: >
+ Post Tests: ${{needs.build-info.outputs.default-python-version}}:
+ ${{needs.build-info.outputs.parallel-test-types}}
+ uses: ./.github/actions/post_tests
+
+ tests-postgres-in-progress-features-disabled:
+ timeout-minutes: 130
+ name: >
+
InProgressDisabledPostgres${{needs.build-info.outputs.default-postgres-version}},
+ Py${{needs.build-info.outputs.default-python-version}}:
+ ${{needs.build-info.outputs.parallel-test-types}}
+ runs-on: "${{needs.build-info.outputs.runs-on}}"
+ needs: [build-info, test-pytest-collection]
+ env:
+ RUNS_ON: "${{needs.build-info.outputs.runs-on}}"
+ PARALLEL_TEST_TYPES: "${{needs.build-info.outputs.parallel-test-types}}"
+ SUSPENDED_PROVIDERS_FOLDERS: "${{
needs.build-info.outputs.suspended-providers-folders }}"
+ PR_LABELS: "${{needs.build-info.outputs.pull-request-labels}}"
+ FULL_TESTS_NEEDED: "${{needs.build-info.outputs.full-tests-needed}}"
+ DEBUG_RESOURCES: "${{needs.build-info.outputs.debug-resources}}"
+ BACKEND: "postgres"
+ PYTHON_MAJOR_MINOR_VERSION:
"${{needs.build-info.outputs.default-python-version}}"
+ PYTHON_VERSION: "${needs.build-info.outputs.default-python-version}}"
+ POSTGRES_VERSION:
"${{needs.build-info.outputs.default-postgres-version}}"
+ BACKEND_VERSION: "${{needs.build-info.outputs.default-postgres-version}}"
+ AIRFLOW_ENABLE_AIP_44: "false"
+ AIRFLOW_ENABLE_AIP_52: "false"
+ JOB_ID: >
+
postgres-in-progress-disabled-${{needs.build-info.outputs.default-python-version}}-
${{needs.build-info.outputs.default-postgres-version}}
COVERAGE: "${{needs.build-info.outputs.run-coverage}}"
if: needs.build-info.outputs.run-tests == 'true' &&
needs.build-info.outputs.run-amazon-tests == 'true'
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index fab996d80d..d99006b937 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -593,7 +593,7 @@ repos:
files: config\.yml$|default_airflow\.cfg$|default\.cfg$
pass_filenames: false
require_serial: true
- additional_dependencies: ['pyyaml']
+ additional_dependencies: ['pyyaml', 'packaging']
- id: check-boring-cyborg-configuration
name: Checks for Boring Cyborg configuration consistency
language: python
diff --git a/airflow/api_internal/internal_api_call.py
b/airflow/api_internal/internal_api_call.py
index e8627ec68b..d0e848e793 100644
--- a/airflow/api_internal/internal_api_call.py
+++ b/airflow/api_internal/internal_api_call.py
@@ -26,6 +26,7 @@ import requests
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import ParamSpec
PS = ParamSpec("PS")
@@ -63,7 +64,9 @@ class InternalApiConfig:
@staticmethod
def _init_values():
- use_internal_api = conf.getboolean("core", "database_access_isolation")
+ use_internal_api = conf.getboolean("core",
"database_access_isolation", fallback=False)
+ if use_internal_api and not _ENABLE_AIP_44:
+ raise RuntimeError("The AIP_44 is not enabled so you cannot use
it.")
internal_api_endpoint = ""
if use_internal_api:
internal_api_url = conf.get("core", "internal_api_url")
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 5502a68fb8..e08b4e01ec 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -34,6 +34,7 @@ from airflow.cli.commands.legacy_commands import
check_legacy_command
from airflow.configuration import conf
from airflow.executors.executor_constants import CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
+from airflow.settings import _ENABLE_AIP_44
from airflow.utils.cli import ColorMode
from airflow.utils.module_loading import import_string
from airflow.utils.timezone import parse as parsedate
@@ -2068,29 +2069,6 @@ core_commands: list[CLICommand] = [
ARG_DEBUG,
),
),
- ActionCommand(
- name="internal-api",
- help="Start a Airflow Internal API instance",
-
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
- args=(
- ARG_INTERNAL_API_PORT,
- ARG_INTERNAL_API_WORKERS,
- ARG_INTERNAL_API_WORKERCLASS,
- ARG_INTERNAL_API_WORKER_TIMEOUT,
- ARG_INTERNAL_API_HOSTNAME,
- ARG_PID,
- ARG_DAEMON,
- ARG_STDOUT,
- ARG_STDERR,
- ARG_INTERNAL_API_ACCESS_LOGFILE,
- ARG_INTERNAL_API_ERROR_LOGFILE,
- ARG_INTERNAL_API_ACCESS_LOGFORMAT,
- ARG_LOG_FILE,
- ARG_SSL_CERT,
- ARG_SSL_KEY,
- ARG_DEBUG,
- ),
- ),
ActionCommand(
name="scheduler",
help="Start a scheduler instance",
@@ -2231,6 +2209,33 @@ core_commands: list[CLICommand] = [
),
]
+if _ENABLE_AIP_44:
+ core_commands.append(
+ ActionCommand(
+ name="internal-api",
+ help="Start a Airflow Internal API instance",
+
func=lazy_load_command("airflow.cli.commands.internal_api_command.internal_api"),
+ args=(
+ ARG_INTERNAL_API_PORT,
+ ARG_INTERNAL_API_WORKERS,
+ ARG_INTERNAL_API_WORKERCLASS,
+ ARG_INTERNAL_API_WORKER_TIMEOUT,
+ ARG_INTERNAL_API_HOSTNAME,
+ ARG_PID,
+ ARG_DAEMON,
+ ARG_STDOUT,
+ ARG_STDERR,
+ ARG_INTERNAL_API_ACCESS_LOGFILE,
+ ARG_INTERNAL_API_ERROR_LOGFILE,
+ ARG_INTERNAL_API_ACCESS_LOGFORMAT,
+ ARG_LOG_FILE,
+ ARG_SSL_CERT,
+ ARG_SSL_KEY,
+ ARG_DEBUG,
+ ),
+ ),
+ )
+
def _remove_dag_id_opt(command: ActionCommand):
cmd = command._asdict()
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index c4ce03e586..29e02df9b3 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -438,14 +438,14 @@ core:
example: '{"some_param": "some_value"}'
database_access_isolation:
description: (experimental) Whether components should use Airflow
Internal API for DB connectivity.
- version_added: 2.6.0
+ version_added: 2.7.0
type: boolean
example: ~
default: "False"
internal_api_url:
description: |
- (experimental)Airflow Internal API url. Only used if [core]
database_access_isolation is True.
- version_added: 2.6.0
+ (experimental) Airflow Internal API url. Only used if [core]
database_access_isolation is True.
+ version_added: 2.7.0
type: string
default: ~
example: 'http://localhost:8080'
@@ -1674,7 +1674,7 @@ webserver:
run_internal_api:
description: |
Boolean for running Internal API in the webserver.
- version_added: 2.6.0
+ version_added: 2.7.0
type: boolean
example: ~
default: "False"
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index 22409997c6..257536cc03 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -250,13 +250,6 @@ daemon_umask = 0o077
# Example: dataset_manager_kwargs = {{"some_param": "some_value"}}
# dataset_manager_kwargs =
-# (experimental) Whether components should use Airflow Internal API for DB
connectivity.
-database_access_isolation = False
-
-# (experimental)Airflow Internal API url. Only used if [core]
database_access_isolation is True.
-# Example: internal_api_url = http://localhost:8080
-# internal_api_url =
-
[database]
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engines.
@@ -857,9 +850,6 @@ audit_view_excluded_events =
gantt,landing_times,tries,duration,calendar,graph,g
# Boolean for running SwaggerUI in the webserver.
enable_swagger_ui = True
-# Boolean for running Internal API in the webserver.
-run_internal_api = False
-
# Boolean for enabling rate limiting on authentication endpoints.
auth_rate_limited = True
diff --git a/airflow/settings.py b/airflow/settings.py
index 98d0509419..79f8d0fa6e 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -605,3 +605,11 @@ DASHBOARD_UIALERTS: list[UIAlert] = []
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
+
+
+# AIP-52: setup/teardown (experimental)
+# This feature is not complete yet, so we disable it by default.
+_ENABLE_AIP_52 = os.environ.get("AIRFLOW_ENABLE_AIP_52", "false").lower() in
{"true", "t", "yes", "y", "1"}
+# AIP-44: internal_api (experimental)
+# This feature is not complete yet, so we disable it by default.
+_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in
("true", "t", "yes", "y", "1")
diff --git a/airflow/www/app.py b/airflow/www/app.py
index abcf76ec0d..5283718089 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -34,6 +34,7 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, RemovedInAirflow3Warning
from airflow.logging_config import configure_logging
from airflow.models import import_all_models
+from airflow.settings import _ENABLE_AIP_44
from airflow.utils.json import AirflowJsonProvider
from airflow.www.extensions.init_appbuilder import init_appbuilder
from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links
@@ -161,6 +162,8 @@ def create_app(config=None, testing=False):
init_error_handlers(flask_app)
init_api_connexion(flask_app)
if conf.getboolean("webserver", "run_internal_api", fallback=False):
+ if not _ENABLE_AIP_44:
+ raise RuntimeError("The AIP_44 is not enabled so you cannot
use it.")
init_api_internal(flask_app)
init_api_experimental(flask_app)
diff --git a/docs/conf.py b/docs/conf.py
index 40e6792864..56ea29b6ce 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -33,12 +33,14 @@ from __future__ import annotations
import json
import os
import pathlib
+import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any
import yaml
+from packaging.version import parse as parse_version
import airflow
from airflow.configuration import AirflowConfigParser, default_config_yaml
@@ -386,6 +388,13 @@ html_context = {
# -- Options for sphinx_jinja ------------------------------------------
# See: https://github.com/tardyp/sphinx-jinja
+airflow_version = parse_version(
+ re.search( # type: ignore[union-attr,arg-type]
+ r"__version__ = \"([0-9\.]*)(\.dev[0-9]*)?\"",
+ (Path(__file__).parents[1] / "airflow" / "__init__.py").read_text(),
+ ).groups(0)[0]
+)
+
# Jinja context
if PACKAGE_NAME == "apache-airflow":
deprecated_options: dict[str, dict[str, tuple[str, str, str]]] =
defaultdict(dict)
@@ -401,10 +410,14 @@ if PACKAGE_NAME == "apache-airflow":
# e.g. {{dag_id}} in default_config.cfg -> {dag_id} in airflow.cfg, and
what we want in docs
keys_to_format = ["default", "example"]
for conf_name, conf_section in configs.items():
- for option_name, option in conf_section["options"].items():
+ for option_name, option in list(conf_section["options"].items()):
for key in keys_to_format:
if option[key] and "{{" in option[key]:
option[key] = option[key].replace("{{", "{").replace("}}",
"}")
+ version_added = option["version_added"]
+ if version_added is not None and parse_version(version_added) >
airflow_version:
+ del conf_section["options"][option_name]
+
# Sort options, config and deprecated options for JINJA variables to
display
for section_name, config in configs.items():
config["options"] = {k: v for k, v in
sorted(config["options"].items())}
diff --git a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
index 97cb921a69..45aff2c2a1 100755
--- a/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
+++ b/scripts/ci/pre_commit/pre_commit_yaml_to_cfg.py
@@ -22,8 +22,11 @@ Module to convert Airflow configs in config.yml to
default_airflow.cfg file
from __future__ import annotations
import os
+import re
+from pathlib import Path
import yaml
+from packaging.version import parse as parse_version
FILE_HEADER = """#
# Licensed to the Apache Software Foundation (ASF) under one
@@ -105,6 +108,10 @@ def _write_section(configfile, section_name, section):
def _write_option(configfile, idx, option_name, option):
option_description = None
+ version_added = option["version_added"]
+ if version_added is not None and parse_version(version_added) >
airflow_version:
+ # skip if option is going to be added in the future version
+ return
if option["description"] is not None:
option_description = list(filter(lambda x: x is not None,
option["description"].splitlines()))
@@ -145,6 +152,13 @@ if __name__ == "__main__":
airflow_default_config_path = os.path.join(airflow_config_dir,
"default_airflow.cfg")
airflow_config_yaml_file_path = os.path.join(airflow_config_dir,
"config.yml")
+ airflow_version = parse_version(
+ re.search( # type: ignore[union-attr,arg-type]
+ r"__version__ = \"([0-9\.]*)(\.dev[0-9]*)?\"",
+ (Path(__file__).parents[3] / "airflow" /
"__init__.py").read_text(),
+ ).groups(0)[0]
+ )
+
write_config(
yaml_config_file_path=airflow_config_yaml_file_path,
default_cfg_file_path=airflow_default_config_path
)
diff --git a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
index 39f3af7388..bd7a7dc189 100644
--- a/tests/api_internal/endpoints/test_rpc_api_endpoint.py
+++ b/tests/api_internal/endpoints/test_rpc_api_endpoint.py
@@ -27,6 +27,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import BaseSerialization
+from airflow.settings import _ENABLE_AIP_44
from airflow.utils.state import State
from airflow.www import app
from tests.test_utils.config import conf_vars
@@ -53,6 +54,7 @@ def minimal_app_for_internal_api() -> Flask:
return factory()
[email protected](not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestRpcApiEndpoint:
@pytest.fixture(autouse=True)
def setup_attrs(self, minimal_app_for_internal_api: Flask) -> Generator:
diff --git a/tests/api_internal/test_internal_api_call.py
b/tests/api_internal/test_internal_api_call.py
index a70e5d6993..67c4fb533c 100644
--- a/tests/api_internal/test_internal_api_call.py
+++ b/tests/api_internal/test_internal_api_call.py
@@ -29,6 +29,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
from airflow.serialization.serialized_objects import BaseSerialization
+from airflow.settings import _ENABLE_AIP_44
from airflow.utils.state import State
from tests.test_utils.config import conf_vars
@@ -38,6 +39,7 @@ def reset_init_api_config():
InternalApiConfig._initialized = False
[email protected](not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestInternalApiConfig:
@conf_vars(
{
@@ -69,6 +71,7 @@ class TestInternalApiConfig:
assert InternalApiConfig.get_use_internal_api() is False
[email protected](not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestInternalApiCall:
@staticmethod
@internal_api_call
diff --git a/tests/cli/commands/test_internal_api_command.py
b/tests/cli/commands/test_internal_api_command.py
index ab25c58de3..472aab5b1f 100644
--- a/tests/cli/commands/test_internal_api_command.py
+++ b/tests/cli/commands/test_internal_api_command.py
@@ -31,6 +31,7 @@ from airflow import settings
from airflow.cli import cli_parser
from airflow.cli.commands import internal_api_command
from airflow.cli.commands.internal_api_command import GunicornMonitor
+from airflow.settings import _ENABLE_AIP_44
from tests.cli.commands._common_cli_classes import _ComonCLIGunicornTestClass
console = Console(width=400, color_system="standard")
@@ -82,6 +83,7 @@ class TestCLIGetNumReadyWorkersRunning:
assert self.monitor._get_num_ready_workers_running() == 0
[email protected](not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestCliInternalAPI(_ComonCLIGunicornTestClass):
main_process_regexp = r"airflow internal-api"