This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cd0c6a7e77 Add Scarf based telemetry (#39510)
cd0c6a7e77 is described below

commit cd0c6a7e77fd687e9fdce74afa064199258ceeef
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu May 16 21:56:33 2024 +0100

    Add Scarf based telemetry (#39510)
    
    To take [the discussion about adding 
Telemetry](https://lists.apache.org/thread/7f6qyr8w2n8w34g63s7ybhzphgt8h43m) 
forward, I am creating a draft PR that adds some basic telemetry to send to 
Scarf.
    
    Voting thread: 
https://lists.apache.org/thread/h1x2glvnd42rbj2q2rgpfo3pjhmpt307
    
    I have added docs on the data collection as well as a way to opt-out of it.
    
    Telemetry added at:
    - Scheduler startup [Custom 
telemetry](https://docs.scarf.sh/custom-telemetry/) similar to other popular 
projects like 
[Unstructured](https://github.com/Unstructured-IO/unstructured/commit/f0a63e27122972901f72c42b4b5b9eb9bed1ada3)
    - Webserver via a [tracking 
pixel](https://docs.scarf.sh/web-traffic/#creating-a-pixel), similar to [Apache 
Superset](https://github.com/apache/superset/pull/26011/files)
    
    Data collected:
    - Airflow version
    - Python version
    - Platform System info: Linux/Darwin
    - Machine type: arm64/aarch64
    - Airflow Metadata DB: Postgres/MySQL
    - DB: sqlite/postgres
    - DB version: 12.6.3
    - Number of DAGs
---
 airflow/cli/commands/scheduler_command.py          |  3 +
 airflow/config_templates/config.yml                | 22 ++++++
 airflow/settings.py                                |  7 ++
 airflow/utils/scarf.py                             | 89 ++++++++++++++++++++++
 airflow/www/templates/airflow/dags.html            |  3 +
 airflow/www/views.py                               | 34 ++++++++-
 docs/apache-airflow/faq.rst                        | 24 ++++++
 .../installation/installing-from-pypi.rst          |  6 ++
 tests/core/test_settings.py                        | 25 +++++-
 tests/utils/test_scarf.py                          | 84 ++++++++++++++++++++
 tests/www/views/test_views.py                      | 30 ++++++++
 tests/www/views/test_views_home.py                 | 14 ++++
 12 files changed, 339 insertions(+), 2 deletions(-)

diff --git a/airflow/cli/commands/scheduler_command.py 
b/airflow/cli/commands/scheduler_command.py
index 2a55ca2373..4f943e961b 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -33,6 +33,7 @@ from airflow.jobs.scheduler_job_runner import 
SchedulerJobRunner
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import process_subdir
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
+from airflow.utils.scarf import scarf_analytics
 from airflow.utils.scheduler_health import serve_health_check
 
 log = logging.getLogger(__name__)
@@ -55,6 +56,8 @@ def scheduler(args: Namespace):
     """Start Airflow Scheduler."""
     print(settings.HEADER)
 
+    scarf_analytics()
+
     run_command_with_daemon_option(
         args=args,
         process_name="scheduler",
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 95d83f9d4c..edfe56b45c 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2591,3 +2591,25 @@ sensors:
       type: float
       example: ~
       default: "604800"
+telemetry_collection:
+  description: |
+    Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic 
telemetry data during operation.
+    This data assists Airflow maintainers in better understanding how Airflow 
is used.
+    Insights gained from this telemetry are critical for prioritizing patches, 
minor releases, and
+    security fixes. Additionally, this information supports key decisions 
related to the development road map.
+    Check the FAQ doc for more information on what data is collected.
+
+    Deployments can opt-out of analytics by setting the ``enabled`` option
+    to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
+    Individual users can easily opt-out of analytics in various ways 
documented in the
+    `Scarf Do Not Track docs <https://docs.scarf.sh/gateway/#do-not-track>`__.
+
+  options:
+    enabled:
+      description: |
+        Enable or disable telemetry data collection and sending via Scarf.
+      version_added: 2.10.0
+      type: boolean
+      example: ~
+      default: "True"
+      see_also: ":ref:`Airflow telemetry FAQ <airflow-telemetry-faq>`"
diff --git a/airflow/settings.py b/airflow/settings.py
index 7b8a222444..176d06270e 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -576,6 +576,13 @@ def initialize():
     atexit.register(dispose_orm)
 
 
+def is_telemetry_collection_enabled() -> bool:
+    """Check if scarf analytics is enabled."""
+    return conf.getboolean("telemetry_collection", "enabled", fallback=True) 
and (
+        os.getenv("SCARF_ANALYTICS", "").strip().lower() != "false"
+    )
+
+
 # Const stuff
 
 KILOBYTE = 1024
diff --git a/airflow/utils/scarf.py b/airflow/utils/scarf.py
new file mode 100644
index 0000000000..ec19480ee7
--- /dev/null
+++ b/airflow/utils/scarf.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import platform
+from urllib.parse import urlencode
+
+import httpx
+from packaging.version import parse
+
+from airflow import __version__ as airflow_version, settings
+from airflow.configuration import conf
+
+
+def scarf_analytics():
+    if not settings.is_telemetry_collection_enabled():
+        return
+
+    # Exclude pre-releases and dev versions
+    if _version_is_prerelease(airflow_version):
+        return
+
+    scarf_domain = "https://apacheairflow.gateway.scarf.sh/scheduler";
+
+    try:
+        platform_sys, arch = get_platform_info()
+
+        params = {
+            "version": airflow_version,
+            "python_version": get_python_version(),
+            "platform": platform_sys,
+            "arch": arch,
+            "database": get_database_name(),
+            "db_version": get_database_version(),
+            "executor": get_executor(),
+        }
+
+        query_string = urlencode(params)
+        scarf_url = f"{scarf_domain}?{query_string}"
+
+        httpx.get(scarf_url, timeout=5.0)
+    except Exception:
+        pass
+
+
+def _version_is_prerelease(version: str) -> bool:
+    return parse(version).is_prerelease
+
+
+def get_platform_info() -> tuple[str, str]:
+    return platform.system(), platform.machine()
+
+
+def get_database_version() -> str:
+    if settings.engine is None:
+        return "None"
+
+    version_info = settings.engine.dialect.server_version_info
+    # Example: (1, 2, 3) -> "1.2.3"
+    return ".".join(map(str, version_info)) if version_info else "None"
+
+
+def get_database_name() -> str:
+    if settings.engine is None:
+        return "None"
+    return settings.engine.dialect.name
+
+
+def get_executor() -> str:
+    return conf.get("core", "EXECUTOR")
+
+
+def get_python_version() -> str:
+    return platform.python_version()
diff --git a/airflow/www/templates/airflow/dags.html 
b/airflow/www/templates/airflow/dags.html
index 1bb5ac25ab..29b97f8fd2 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -490,4 +490,7 @@
       return false;
     }
   </script>
+  {% if scarf_url %}
+  <img referrerpolicy="no-referrer-when-downgrade" src="{{ scarf_url }}" 
width="0" height="0" alt="" style="display:none;" />
+  {% endif %}
 {% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 26c4d8ff1a..606d48e99c 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -117,7 +117,7 @@ from airflow.ti_deps.dependencies_deps import 
SCHEDULER_QUEUED_DEPS
 from airflow.timetables._cron import CronMixin
 from airflow.timetables.base import DataInterval, TimeRestriction
 from airflow.timetables.simple import ContinuousTimetable
-from airflow.utils import json as utils_json, timezone, yaml
+from airflow.utils import json as utils_json, scarf, timezone, yaml
 from airflow.utils.airflow_flask_app import get_airflow_app
 from airflow.utils.dag_edges import dag_edges
 from airflow.utils.db import get_query_count
@@ -217,6 +217,32 @@ def get_safe_url(url):
     return redirect_url.geturl()
 
 
+def build_scarf_url(dags_count: int) -> str:
+    """Build the URL for the Scarf telemetry collection."""
+    if not settings.is_telemetry_collection_enabled():
+        return ""
+
+    scarf_domain = "https://apacheairflow.gateway.scarf.sh";
+
+    platform_sys, platform_arch = scarf.get_platform_info()
+    db_version = scarf.get_database_version()
+    db_name = scarf.get_database_name()
+    executor = scarf.get_executor()
+    python_version = scarf.get_python_version()
+
+    # Path Format:
+    # 
/{version}/{python_version}/{platform}/{arch}/{database}/{db_version}/{executor}/{num_dags}
+    #
+    # This path redirects to a Pixel tracking URL
+    scarf_url = (
+        f"{scarf_domain}/webserver"
+        f"/{version}/{python_version}"
+        
f"/{platform_sys}/{platform_arch}/{db_name}/{db_version}/{executor}/{dags_count}"
+    )
+
+    return scarf_url
+
+
 def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
     """Get Execution Data, Base Date & Number of runs from a Request."""
     date_time = www_request.args.get("execution_date")
@@ -1034,6 +1060,11 @@ class Airflow(AirflowBaseView):
                     "warning",
                 )
 
+        try:
+            scarf_url = build_scarf_url(dags_count=all_dags_count)
+        except Exception:
+            scarf_url = ""
+
         return self.render_template(
             "airflow/dags.html",
             dags=dags,
@@ -1072,6 +1103,7 @@ class Airflow(AirflowBaseView):
             sorting_direction=arg_sorting_direction,
             auto_refresh_interval=conf.getint("webserver", 
"auto_refresh_interval"),
             dataset_triggered_next_run_info=dataset_triggered_next_run_info,
+            scarf_url=scarf_url,
         )
 
     @expose("/datasets")
diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst
index 9643663eb7..31ec98b9ff 100644
--- a/docs/apache-airflow/faq.rst
+++ b/docs/apache-airflow/faq.rst
@@ -522,3 +522,27 @@ This means ``explicit_defaults_for_timestamp`` is disabled 
in your mysql server
 
 #. Set ``explicit_defaults_for_timestamp = 1`` under the ``mysqld`` section in 
your ``my.cnf`` file.
 #. Restart the Mysql server.
+
+Does Airflow collect any telemetry data?
+----------------------------------------
+
+.. _airflow-telemetry-faq:
+
+Airflow integrates `Scarf <https://about.scarf.sh/>`__ to collect basic 
telemetry data during operation.
+This data assists Airflow maintainers in better understanding how Airflow is 
used.
+Insights gained from this telemetry are critical for prioritizing patches, 
minor releases, and
+security fixes. Additionally, this information supports key decisions related 
to the development road map.
+
+Deployments can opt-out of analytics by setting the 
:ref:`[telemetry_collection] enabled <config:telemetry_collection__enabled>`
+option to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
+Individual users can easily opt-out of analytics in various ways documented in 
the
+`Scarf Do Not Track docs <https://docs.scarf.sh/gateway/#do-not-track>`__.
+
+The telemetry data collected is limited to the following:
+
+- Airflow version
+- Python version
+- Operating system & machine architecture
+- Executor
+- Metadata DB type & its version
+- Number of DAGs
diff --git a/docs/apache-airflow/installation/installing-from-pypi.rst 
b/docs/apache-airflow/installation/installing-from-pypi.rst
index bd4ecbcbe1..4751b54112 100644
--- a/docs/apache-airflow/installation/installing-from-pypi.rst
+++ b/docs/apache-airflow/installation/installing-from-pypi.rst
@@ -331,6 +331,12 @@ dependencies compatible with just airflow core at the 
moment Airflow was release
     pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint 
"${CONSTRAINT_URL}"
 
 
+.. note::
+
+    Airflow uses `Scarf <https://about.scarf.sh/>`__ to collect basic 
telemetry data during operation.
+    Check the :ref:`Airflow telemetry FAQ <airflow-telemetry-faq>` for more 
information about the data collected
+    and how to opt-out.
+
 Troubleshooting
 '''''''''''''''
 
diff --git a/tests/core/test_settings.py b/tests/core/test_settings.py
index 5eac456103..c2b4938421 100644
--- a/tests/core/test_settings.py
+++ b/tests/core/test_settings.py
@@ -28,7 +28,7 @@ import pytest
 
 from airflow.api_internal.internal_api_call import InternalApiConfig
 from airflow.exceptions import AirflowClusterPolicyViolation, 
AirflowConfigException
-from airflow.settings import _ENABLE_AIP_44, TracebackSession
+from airflow.settings import _ENABLE_AIP_44, TracebackSession, 
is_telemetry_collection_enabled
 from airflow.utils.session import create_session
 from tests.test_utils.config import conf_vars
 
@@ -324,3 +324,26 @@ def test_create_session_ctx_mgr_no_call_methods(mock_new, 
clear_internal_api):
         assert session == m
     method_calls = [x[0] for x in m.method_calls]
     assert method_calls == []  # commit and close not called when using 
internal API
+
+
[email protected](
+    "env_var, conf_setting, is_enabled",
+    [
+        ("false", "True", False),  # env forces disable
+        ("false", "False", False),  # Both force disable
+        ("False ", "False", False),  # Both force disable
+        ("true", "True", True),  # Both enable
+        ("true", "False", False),  # Conf forces disable
+        (None, "True", True),  # Default env, conf enables
+        (None, "False", False),  # Default env, conf disables
+    ],
+)
+def test_telemetry_collection_disabled(env_var, conf_setting, is_enabled):
+    conf_patch = conf_vars({("telemetry_collection", "enabled"): conf_setting})
+
+    if env_var is not None:
+        with conf_patch, patch.dict(os.environ, {"SCARF_ANALYTICS": env_var}):
+            assert is_telemetry_collection_enabled() == is_enabled
+    else:
+        with conf_patch:
+            assert is_telemetry_collection_enabled() == is_enabled
diff --git a/tests/utils/test_scarf.py b/tests/utils/test_scarf.py
new file mode 100644
index 0000000000..507ce0357b
--- /dev/null
+++ b/tests/utils/test_scarf.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import platform
+from unittest import mock
+
+import pytest
+
+from airflow import __version__ as airflow_version
+from airflow.configuration import conf
+from airflow.utils.scarf import get_database_version, scarf_analytics
+
+
[email protected]("is_enabled, is_prerelease", [(False, True), (True, 
True)])
[email protected]("httpx.get")
+def test_scarf_analytics_disabled(mock_get, is_enabled, is_prerelease):
+    with mock.patch("airflow.settings.is_telemetry_collection_enabled", 
return_value=is_enabled), mock.patch(
+        "airflow.utils.scarf._version_is_prerelease", 
return_value=is_prerelease
+    ):
+        scarf_analytics()
+    mock_get.assert_not_called()
+
+
[email protected]("airflow.settings.is_telemetry_collection_enabled", 
return_value=True)
[email protected]("airflow.utils.scarf._version_is_prerelease", return_value=False)
[email protected]("airflow.utils.scarf.get_database_version", return_value="12.3")
[email protected]("airflow.utils.scarf.get_database_name", return_value="postgres")
[email protected]("httpx.get")
+def test_scarf_analytics(
+    mock_get,
+    mock_is_telemetry_collection_enabled,
+    mock_version_is_prerelease,
+    get_database_version,
+    get_database_name,
+):
+    platform_sys = platform.system()
+    platform_machine = platform.machine()
+    python_version = platform.python_version()
+    executor = conf.get("core", "EXECUTOR")
+    scarf_endpoint = "https://apacheairflow.gateway.scarf.sh/scheduler";
+    scarf_analytics()
+
+    expected_scarf_url = (
+        f"{scarf_endpoint}?version={airflow_version}"
+        f"&python_version={python_version}"
+        f"&platform={platform_sys}"
+        f"&arch={platform_machine}"
+        f"&database=postgres"
+        f"&db_version=12.3"
+        f"&executor={executor}"
+    )
+
+    mock_get.assert_called_once_with(expected_scarf_url, timeout=5.0)
+
+
[email protected]_test
[email protected](
+    "version_info, expected_version",
+    [
+        ((1, 2, 3), "1.2.3"),  # Normal version tuple
+        (None, "None"),  # No version info available
+        ((1,), "1"),  # Single element version tuple
+        ((1, 2, 3, "beta", 4), "1.2.3.beta.4"),  # Complex version tuple with 
strings
+    ],
+)
+def test_get_database_version(version_info, expected_version):
+    with mock.patch("airflow.settings.engine.dialect.server_version_info", 
new=version_info):
+        assert get_database_version() == expected_version
diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py
index 27f096403f..527e3ff5e4 100644
--- a/tests/www/views/test_views.py
+++ b/tests/www/views/test_views.py
@@ -20,9 +20,11 @@ from __future__ import annotations
 import os
 import re
 from unittest import mock
+from unittest.mock import patch
 
 import pytest
 
+from airflow import __version__ as airflow_version
 from airflow.configuration import (
     initialize_config,
     write_default_airflow_configuration_if_needed,
@@ -31,6 +33,7 @@ from airflow.configuration import (
 from airflow.plugins_manager import AirflowPlugin, EntryPointSource
 from airflow.utils.task_group import TaskGroup
 from airflow.www.views import (
+    build_scarf_url,
     get_key_paths,
     get_safe_url,
     get_task_stats_from_query,
@@ -525,3 +528,30 @@ def test_invalid_dates(app, admin_client, url, content):
 
     assert resp.status_code == 400
     assert re.search(content, resp.get_data().decode())
+
+
[email protected]("enabled, dags_count", [(False, 5), (True, 5)])
+@patch("airflow.utils.scarf.get_platform_info", return_value=("Linux", 
"x86_64"))
+@patch("airflow.utils.scarf.get_database_version", return_value="12.3")
+@patch("airflow.utils.scarf.get_database_name", return_value="postgres")
+@patch("airflow.utils.scarf.get_executor", return_value="SequentialExecutor")
+@patch("airflow.utils.scarf.get_python_version", return_value="3.8.5")
+def test_build_scarf_url(
+    get_platform_info,
+    get_database_version,
+    get_database_name,
+    get_executor,
+    get_python_version,
+    enabled,
+    dags_count,
+):
+    with patch("airflow.settings.is_telemetry_collection_enabled", 
return_value=enabled):
+        result = build_scarf_url(dags_count)
+        expected_url = (
+            "https://apacheairflow.gateway.scarf.sh/webserver/";
+            
f"{airflow_version}/3.8.5/Linux/x86_64/postgres/12.3/SequentialExecutor/5"
+        )
+        if enabled:
+            assert result == expected_url
+        else:
+            assert result == ""
diff --git a/tests/www/views/test_views_home.py 
b/tests/www/views/test_views_home.py
index 5ddcb65a87..52011c96cf 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -451,3 +451,17 @@ def test_sorting_home_view(url, lower_key, greater_key, 
user_client, working_dag
     lower_index = resp_html.find(lower_key)
     greater_index = resp_html.find(greater_key)
     assert lower_index < greater_index
+
+
[email protected]("is_enabled, should_have_pixel", [(False, False), 
(True, True)])
+def test_analytics_pixel(user_client, is_enabled, should_have_pixel):
+    """
+    Test that the analytics pixel is not included when the feature is disabled
+    """
+    with mock.patch("airflow.settings.is_telemetry_collection_enabled", 
return_value=is_enabled):
+        resp = user_client.get("home", follow_redirects=True)
+
+    if should_have_pixel:
+        check_content_in_response("apacheairflow.gateway.scarf.sh", resp)
+    else:
+        check_content_not_in_response("apacheairflow.gateway.scarf.sh", resp)

Reply via email to