This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cd0c6a7e77 Add Scarf based telemetry (#39510)
cd0c6a7e77 is described below
commit cd0c6a7e77fd687e9fdce74afa064199258ceeef
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu May 16 21:56:33 2024 +0100
Add Scarf based telemetry (#39510)
To take [the discussion about adding
Telemetry](https://lists.apache.org/thread/7f6qyr8w2n8w34g63s7ybhzphgt8h43m)
forward, I am creating a draft PR that adds some basic telemetry to send to
Scarf.
Voting thread:
https://lists.apache.org/thread/h1x2glvnd42rbj2q2rgpfo3pjhmpt307
I have added docs on the data collection as well as a way to opt-out of it.
Telemetry added at:
- Scheduler startup [Custom
telemetry](https://docs.scarf.sh/custom-telemetry/) similar to other popular
projects like
[Unstructured](https://github.com/Unstructured-IO/unstructured/commit/f0a63e27122972901f72c42b4b5b9eb9bed1ada3)
- Webserver via a [tracking
pixel](https://docs.scarf.sh/web-traffic/#creating-a-pixel), similar to [Apache
Superset](https://github.com/apache/superset/pull/26011/files)
Data collected:
- Airflow version
- Python version
- Platform System info: Linux/Darwin
- Machine type: arm64/aarch64
- Airflow Metadata DB: Postgres/MySQL
- DB: sqlite/postgres
- DB version: 12.6.3
- Number of DAGs
---
airflow/cli/commands/scheduler_command.py | 3 +
airflow/config_templates/config.yml | 22 ++++++
airflow/settings.py | 7 ++
airflow/utils/scarf.py | 89 ++++++++++++++++++++++
airflow/www/templates/airflow/dags.html | 3 +
airflow/www/views.py | 34 ++++++++-
docs/apache-airflow/faq.rst | 24 ++++++
.../installation/installing-from-pypi.rst | 6 ++
tests/core/test_settings.py | 25 +++++-
tests/utils/test_scarf.py | 84 ++++++++++++++++++++
tests/www/views/test_views.py | 30 ++++++++
tests/www/views/test_views_home.py | 14 ++++
12 files changed, 339 insertions(+), 2 deletions(-)
diff --git a/airflow/cli/commands/scheduler_command.py
b/airflow/cli/commands/scheduler_command.py
index 2a55ca2373..4f943e961b 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -33,6 +33,7 @@ from airflow.jobs.scheduler_job_runner import
SchedulerJobRunner
from airflow.utils import cli as cli_utils
from airflow.utils.cli import process_subdir
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
+from airflow.utils.scarf import scarf_analytics
from airflow.utils.scheduler_health import serve_health_check
log = logging.getLogger(__name__)
@@ -55,6 +56,8 @@ def scheduler(args: Namespace):
"""Start Airflow Scheduler."""
print(settings.HEADER)
+ scarf_analytics()
+
run_command_with_daemon_option(
args=args,
process_name="scheduler",
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 95d83f9d4c..edfe56b45c 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2591,3 +2591,25 @@ sensors:
type: float
example: ~
default: "604800"
+telemetry_collection:
+ description: |
+ Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic
telemetry data during operation.
+ This data assists Airflow maintainers in better understanding how Airflow
is used.
+ Insights gained from this telemetry are critical for prioritizing patches,
minor releases, and
+ security fixes. Additionally, this information supports key decisions
related to the development road map.
+ Check the FAQ doc for more information on what data is collected.
+
+ Deployments can opt-out of analytics by setting the ``enabled`` option
+ to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
+ Individual users can easily opt-out of analytics in various ways
documented in the
+ `Scarf Do Not Track docs <https://docs.scarf.sh/gateway/#do-not-track>`__.
+
+ options:
+ enabled:
+ description: |
+ Enable or disable telemetry data collection and sending via Scarf.
+ version_added: 2.10.0
+ type: boolean
+ example: ~
+ default: "True"
+ see_also: ":ref:`Airflow telemetry FAQ <airflow-telemetry-faq>`"
diff --git a/airflow/settings.py b/airflow/settings.py
index 7b8a222444..176d06270e 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -576,6 +576,13 @@ def initialize():
atexit.register(dispose_orm)
+def is_telemetry_collection_enabled() -> bool:
+ """Check if scarf analytics is enabled."""
+ return conf.getboolean("telemetry_collection", "enabled", fallback=True)
and (
+ os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
+ )
+
+
# Const stuff
KILOBYTE = 1024
diff --git a/airflow/utils/scarf.py b/airflow/utils/scarf.py
new file mode 100644
index 0000000000..ec19480ee7
--- /dev/null
+++ b/airflow/utils/scarf.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import platform
+from urllib.parse import urlencode
+
+import httpx
+from packaging.version import parse
+
+from airflow import __version__ as airflow_version, settings
+from airflow.configuration import conf
+
+
+def scarf_analytics():
+ if not settings.is_telemetry_collection_enabled():
+ return
+
+ # Exclude pre-releases and dev versions
+ if _version_is_prerelease(airflow_version):
+ return
+
+ scarf_domain = "https://apacheairflow.gateway.scarf.sh/scheduler"
+
+ try:
+ platform_sys, arch = get_platform_info()
+
+ params = {
+ "version": airflow_version,
+ "python_version": get_python_version(),
+ "platform": platform_sys,
+ "arch": arch,
+ "database": get_database_name(),
+ "db_version": get_database_version(),
+ "executor": get_executor(),
+ }
+
+ query_string = urlencode(params)
+ scarf_url = f"{scarf_domain}?{query_string}"
+
+ httpx.get(scarf_url, timeout=5.0)
+ except Exception:
+ pass
+
+
+def _version_is_prerelease(version: str) -> bool:
+ return parse(version).is_prerelease
+
+
+def get_platform_info() -> tuple[str, str]:
+ return platform.system(), platform.machine()
+
+
+def get_database_version() -> str:
+ if settings.engine is None:
+ return "None"
+
+ version_info = settings.engine.dialect.server_version_info
+ # Example: (1, 2, 3) -> "1.2.3"
+ return ".".join(map(str, version_info)) if version_info else "None"
+
+
+def get_database_name() -> str:
+ if settings.engine is None:
+ return "None"
+ return settings.engine.dialect.name
+
+
+def get_executor() -> str:
+ return conf.get("core", "EXECUTOR")
+
+
+def get_python_version() -> str:
+ return platform.python_version()
diff --git a/airflow/www/templates/airflow/dags.html
b/airflow/www/templates/airflow/dags.html
index 1bb5ac25ab..29b97f8fd2 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -490,4 +490,7 @@
return false;
}
</script>
+ {% if scarf_url %}
+ <img referrerpolicy="no-referrer-when-downgrade" src="{{ scarf_url }}"
width="0" height="0" alt="" style="display:none;" />
+ {% endif %}
{% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 26c4d8ff1a..606d48e99c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -117,7 +117,7 @@ from airflow.ti_deps.dependencies_deps import
SCHEDULER_QUEUED_DEPS
from airflow.timetables._cron import CronMixin
from airflow.timetables.base import DataInterval, TimeRestriction
from airflow.timetables.simple import ContinuousTimetable
-from airflow.utils import json as utils_json, timezone, yaml
+from airflow.utils import json as utils_json, scarf, timezone, yaml
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.dag_edges import dag_edges
from airflow.utils.db import get_query_count
@@ -217,6 +217,32 @@ def get_safe_url(url):
return redirect_url.geturl()
+def build_scarf_url(dags_count: int) -> str:
+ """Build the URL for the Scarf telemetry collection."""
+ if not settings.is_telemetry_collection_enabled():
+ return ""
+
+ scarf_domain = "https://apacheairflow.gateway.scarf.sh"
+
+ platform_sys, platform_arch = scarf.get_platform_info()
+ db_version = scarf.get_database_version()
+ db_name = scarf.get_database_name()
+ executor = scarf.get_executor()
+ python_version = scarf.get_python_version()
+
+ # Path Format:
+ #
/{version}/{python_version}/{platform}/{arch}/{database}/{db_version}/{executor}/{num_dags}
+ #
+ # This path redirects to a Pixel tracking URL
+ scarf_url = (
+ f"{scarf_domain}/webserver"
+ f"/{version}/{python_version}"
+
f"/{platform_sys}/{platform_arch}/{db_name}/{db_version}/{executor}/{dags_count}"
+ )
+
+ return scarf_url
+
+
def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
"""Get Execution Data, Base Date & Number of runs from a Request."""
date_time = www_request.args.get("execution_date")
@@ -1034,6 +1060,11 @@ class Airflow(AirflowBaseView):
"warning",
)
+ try:
+ scarf_url = build_scarf_url(dags_count=all_dags_count)
+ except Exception:
+ scarf_url = ""
+
return self.render_template(
"airflow/dags.html",
dags=dags,
@@ -1072,6 +1103,7 @@ class Airflow(AirflowBaseView):
sorting_direction=arg_sorting_direction,
auto_refresh_interval=conf.getint("webserver",
"auto_refresh_interval"),
dataset_triggered_next_run_info=dataset_triggered_next_run_info,
+ scarf_url=scarf_url,
)
@expose("/datasets")
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 9643663eb7..31ec98b9ff 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -522,3 +522,27 @@ This means ``explicit_defaults_for_timestamp`` is disabled
in your mysql server
#. Set ``explicit_defaults_for_timestamp = 1`` under the ``mysqld`` section in
your ``my.cnf`` file.
#. Restart the Mysql server.
+
+Does Airflow collect any telemetry data?
+----------------------------------------
+
+.. _airflow-telemetry-faq:
+
+Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic
telemetry data during operation.
+This data assists Airflow maintainers in better understanding how Airflow is
used.
+Insights gained from this telemetry are critical for prioritizing patches,
minor releases, and
+security fixes. Additionally, this information supports key decisions related
to the development road map.
+
+Deployments can opt-out of analytics by setting the
:ref:`[telemetry_collection] enabled <config:telemetry_collection__enabled>`
+option to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
+Individual users can easily opt-out of analytics in various ways documented in
the
+`Scarf Do Not Track docs <https://docs.scarf.sh/gateway/#do-not-track>`__.
+
+The telemetry data collected is limited to the following:
+
+- Airflow version
+- Python version
+- Operating system & machine architecture
+- Executor
+- Metadata DB type & its version
+- Number of DAGs
diff --git a/docs/apache-airflow/installation/installing-from-pypi.rst
b/docs/apache-airflow/installation/installing-from-pypi.rst
index bd4ecbcbe1..4751b54112 100644
--- a/docs/apache-airflow/installation/installing-from-pypi.rst
+++ b/docs/apache-airflow/installation/installing-from-pypi.rst
@@ -331,6 +331,12 @@ dependencies compatible with just airflow core at the
moment Airflow was release
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint
"${CONSTRAINT_URL}"
+.. note::
+
+ Airflow uses `Scarf <https://about.scarf.sh/>`__ to collect basic
telemetry data during operation.
+ Check the :ref:`Airflow telemetry FAQ <airflow-telemetry-faq>` for more
information about the data collected
+ and how to opt-out.
+
Troubleshooting
'''''''''''''''
diff --git a/tests/core/test_settings.py b/tests/core/test_settings.py
index 5eac456103..c2b4938421 100644
--- a/tests/core/test_settings.py
+++ b/tests/core/test_settings.py
@@ -28,7 +28,7 @@ import pytest
from airflow.api_internal.internal_api_call import InternalApiConfig
from airflow.exceptions import AirflowClusterPolicyViolation,
AirflowConfigException
-from airflow.settings import _ENABLE_AIP_44, TracebackSession
+from airflow.settings import _ENABLE_AIP_44, TracebackSession,
is_telemetry_collection_enabled
from airflow.utils.session import create_session
from tests.test_utils.config import conf_vars
@@ -324,3 +324,26 @@ def test_create_session_ctx_mgr_no_call_methods(mock_new,
clear_internal_api):
assert session == m
method_calls = [x[0] for x in m.method_calls]
assert method_calls == [] # commit and close not called when using
internal API
+
+
[email protected](
+ "env_var, conf_setting, is_enabled",
+ [
+ ("false", "True", False), # env forces disable
+ ("false", "False", False), # Both force disable
+ ("False ", "False", False), # Both force disable
+ ("true", "True", True), # Both enable
+ ("true", "False", False), # Conf forces disable
+ (None, "True", True), # Default env, conf enables
+ (None, "False", False), # Default env, conf disables
+ ],
+)
+def test_telemetry_collection_disabled(env_var, conf_setting, is_enabled):
+ conf_patch = conf_vars({("telemetry_collection", "enabled"): conf_setting})
+
+ if env_var is not None:
+ with conf_patch, patch.dict(os.environ, {"SCARF_ANALYTICS": env_var}):
+ assert is_telemetry_collection_enabled() == is_enabled
+ else:
+ with conf_patch:
+ assert is_telemetry_collection_enabled() == is_enabled
diff --git a/tests/utils/test_scarf.py b/tests/utils/test_scarf.py
new file mode 100644
index 0000000000..507ce0357b
--- /dev/null
+++ b/tests/utils/test_scarf.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import platform
+from unittest import mock
+
+import pytest
+
+from airflow import __version__ as airflow_version
+from airflow.configuration import conf
+from airflow.utils.scarf import get_database_version, scarf_analytics
+
+
[email protected]("is_enabled, is_prerelease", [(False, True), (True,
True)])
[email protected]("httpx.get")
+def test_scarf_analytics_disabled(mock_get, is_enabled, is_prerelease):
+ with mock.patch("airflow.settings.is_telemetry_collection_enabled",
return_value=is_enabled), mock.patch(
+ "airflow.utils.scarf._version_is_prerelease",
return_value=is_prerelease
+ ):
+ scarf_analytics()
+ mock_get.assert_not_called()
+
+
[email protected]("airflow.settings.is_telemetry_collection_enabled",
return_value=True)
[email protected]("airflow.utils.scarf._version_is_prerelease", return_value=False)
[email protected]("airflow.utils.scarf.get_database_version", return_value="12.3")
[email protected]("airflow.utils.scarf.get_database_name", return_value="postgres")
[email protected]("httpx.get")
+def test_scarf_analytics(
+ mock_get,
+ mock_is_telemetry_collection_enabled,
+ mock_version_is_prerelease,
+ get_database_version,
+ get_database_name,
+):
+ platform_sys = platform.system()
+ platform_machine = platform.machine()
+ python_version = platform.python_version()
+ executor = conf.get("core", "EXECUTOR")
+ scarf_endpoint = "https://apacheairflow.gateway.scarf.sh/scheduler"
+ scarf_analytics()
+
+ expected_scarf_url = (
+ f"{scarf_endpoint}?version={airflow_version}"
+ f"&python_version={python_version}"
+ f"&platform={platform_sys}"
+ f"&arch={platform_machine}"
+ f"&database=postgres"
+ f"&db_version=12.3"
+ f"&executor={executor}"
+ )
+
+ mock_get.assert_called_once_with(expected_scarf_url, timeout=5.0)
+
+
[email protected]_test
[email protected](
+ "version_info, expected_version",
+ [
+ ((1, 2, 3), "1.2.3"), # Normal version tuple
+ (None, "None"), # No version info available
+ ((1,), "1"), # Single element version tuple
+ ((1, 2, 3, "beta", 4), "1.2.3.beta.4"), # Complex version tuple with
strings
+ ],
+)
+def test_get_database_version(version_info, expected_version):
+ with mock.patch("airflow.settings.engine.dialect.server_version_info",
new=version_info):
+ assert get_database_version() == expected_version
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index 27f096403f..527e3ff5e4 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -20,9 +20,11 @@ from __future__ import annotations
import os
import re
from unittest import mock
+from unittest.mock import patch
import pytest
+from airflow import __version__ as airflow_version
from airflow.configuration import (
initialize_config,
write_default_airflow_configuration_if_needed,
@@ -31,6 +33,7 @@ from airflow.configuration import (
from airflow.plugins_manager import AirflowPlugin, EntryPointSource
from airflow.utils.task_group import TaskGroup
from airflow.www.views import (
+ build_scarf_url,
get_key_paths,
get_safe_url,
get_task_stats_from_query,
@@ -525,3 +528,30 @@ def test_invalid_dates(app, admin_client, url, content):
assert resp.status_code == 400
assert re.search(content, resp.get_data().decode())
+
+
[email protected]("enabled, dags_count", [(False, 5), (True, 5)])
+@patch("airflow.utils.scarf.get_platform_info", return_value=("Linux",
"x86_64"))
+@patch("airflow.utils.scarf.get_database_version", return_value="12.3")
+@patch("airflow.utils.scarf.get_database_name", return_value="postgres")
+@patch("airflow.utils.scarf.get_executor", return_value="SequentialExecutor")
+@patch("airflow.utils.scarf.get_python_version", return_value="3.8.5")
+def test_build_scarf_url(
+ get_platform_info,
+ get_database_version,
+ get_database_name,
+ get_executor,
+ get_python_version,
+ enabled,
+ dags_count,
+):
+ with patch("airflow.settings.is_telemetry_collection_enabled",
return_value=enabled):
+ result = build_scarf_url(dags_count)
+ expected_url = (
+ "https://apacheairflow.gateway.scarf.sh/webserver/"
+
f"{airflow_version}/3.8.5/Linux/x86_64/postgres/12.3/SequentialExecutor/5"
+ )
+ if enabled:
+ assert result == expected_url
+ else:
+ assert result == ""
diff --git a/tests/www/views/test_views_home.py
b/tests/www/views/test_views_home.py
index 5ddcb65a87..52011c96cf 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -451,3 +451,17 @@ def test_sorting_home_view(url, lower_key, greater_key,
user_client, working_dag
lower_index = resp_html.find(lower_key)
greater_index = resp_html.find(greater_key)
assert lower_index < greater_index
+
+
[email protected]("is_enabled, should_have_pixel", [(False, False),
(True, True)])
+def test_analytics_pixel(user_client, is_enabled, should_have_pixel):
+ """
+ Test that the analytics pixel is not included when the feature is disabled
+ """
+ with mock.patch("airflow.settings.is_telemetry_collection_enabled",
return_value=is_enabled):
+ resp = user_client.get("home", follow_redirects=True)
+
+ if should_have_pixel:
+ check_content_in_response("apacheairflow.gateway.scarf.sh", resp)
+ else:
+ check_content_not_in_response("apacheairflow.gateway.scarf.sh", resp)