This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ecbf02386a Add basic metrics to stats collector. (#35368)
ecbf02386a is described below
commit ecbf02386a2ef7e12d1a7846a6dda1d8a9aff8ab
Author: Jakub Dardzinski <[email protected]>
AuthorDate: Tue Nov 21 06:45:04 2023 +0100
Add basic metrics to stats collector. (#35368)
Signed-off-by: Jakub Dardzinski <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
---
airflow/providers/openlineage/plugins/adapter.py | 5 +-
.../plugins/test_openlineage_adapter.py | 75 +++++++++++++++++++---
2 files changed, 69 insertions(+), 11 deletions(-)
diff --git a/airflow/providers/openlineage/plugins/adapter.py
b/airflow/providers/openlineage/plugins/adapter.py
index fabb14eaa3..a925ddf8e6 100644
--- a/airflow/providers/openlineage/plugins/adapter.py
+++ b/airflow/providers/openlineage/plugins/adapter.py
@@ -38,6 +38,7 @@ from openlineage.client.run import Job, Run, RunEvent,
RunState
from airflow.configuration import conf
from airflow.providers.openlineage import __version__ as
OPENLINEAGE_PROVIDER_VERSION
from airflow.providers.openlineage.utils.utils import OpenLineageRedactor
+from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
@@ -113,8 +114,10 @@ class OpenLineageAdapter(LoggingMixin):
self._client = self.get_or_create_openlineage_client()
redacted_event: RunEvent = self._redacter.redact(event, max_depth=20)
# type: ignore[assignment]
try:
- return self._client.emit(redacted_event)
+ with Stats.timer("ol.emit.attempts"):
+ return self._client.emit(redacted_event)
except Exception as e:
+ Stats.incr("ol.emit.failed")
self.log.warning("Failed to emit OpenLineage event of id %s",
event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)
diff --git a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
index 685e88c725..bcb92b2b9b 100644
--- a/tests/providers/openlineage/plugins/test_openlineage_adapter.py
+++ b/tests/providers/openlineage/plugins/test_openlineage_adapter.py
@@ -39,7 +39,10 @@ from tests.test_utils.config import conf_vars
pytestmark = pytest.mark.db_test
[email protected](os.environ, {"OPENLINEAGE_URL": "http://ol-api:5000",
"OPENLINEAGE_API_KEY": "api-key"})
[email protected](
+ os.environ,
+ {"OPENLINEAGE_URL": "http://ol-api:5000", "OPENLINEAGE_API_KEY":
"api-key"},
+)
def test_create_client_from_ol_env():
client = OpenLineageAdapter().get_or_create_openlineage_client()
@@ -90,7 +93,11 @@ def test_create_client_from_env_var_config():
@patch.dict(
- os.environ, {"OPENLINEAGE_URL": "http://ol-from-env:5000",
"OPENLINEAGE_API_KEY": "api-key-from-env"}
+ os.environ,
+ {
+ "OPENLINEAGE_URL": "http://ol-from-env:5000",
+ "OPENLINEAGE_API_KEY": "api-key-from-env",
+ },
)
@patch.dict(os.environ, {"OPENLINEAGE_CONFIG": "some/config.yml"})
def test_create_client_overrides_env_vars():
@@ -108,7 +115,9 @@ def test_create_client_overrides_env_vars():
assert client.transport.kind == "console"
-def test_emit_start_event():
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_start_event(mock_stats_incr, mock_stats_timer):
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -138,7 +147,8 @@ def test_emit_start_event():
runId=run_id,
facets={
"nominalTime": NominalTimeRunFacet(
- nominalStartTime="2022-01-01T00:00:00",
nominalEndTime="2022-01-01T00:00:00"
+ nominalStartTime="2022-01-01T00:00:00",
+ nominalEndTime="2022-01-01T00:00:00",
),
"processing_engine": ProcessingEngineRunFacet(
version=ANY, name="Airflow",
openlineageAdapterVersion=ANY
@@ -158,8 +168,13 @@ def test_emit_start_event():
in client.emit.mock_calls
)
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
-def test_emit_complete_event():
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -187,8 +202,13 @@ def test_emit_complete_event():
in client.emit.mock_calls
)
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
-def test_emit_failed_event():
+
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -216,9 +236,14 @@ def test_emit_failed_event():
in client.emit.mock_calls
)
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
-def test_emit_dag_started_event(uuid):
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_dag_started_event(mock_stats_incr, mock_stats_timer, uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -248,7 +273,8 @@ def test_emit_dag_started_event(uuid):
runId=random_uuid,
facets={
"nominalTime": NominalTimeRunFacet(
- nominalStartTime=event_time.isoformat(),
nominalEndTime=event_time.isoformat()
+ nominalStartTime=event_time.isoformat(),
+ nominalEndTime=event_time.isoformat(),
)
},
),
@@ -261,9 +287,14 @@ def test_emit_dag_started_event(uuid):
in client.emit.mock_calls
)
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
-def test_emit_dag_complete_event(uuid):
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_dag_complete_event(mock_stats_incr, mock_stats_timer, uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -298,9 +329,14 @@ def test_emit_dag_complete_event(uuid):
in client.emit.mock_calls
)
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
@mock.patch("airflow.providers.openlineage.plugins.adapter.uuid")
-def test_emit_dag_failed_event(uuid):
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.timer")
[email protected]("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_emit_dag_failed_event(mock_stats_incr, mock_stats_timer, uuid):
random_uuid = "9d3b14f7-de91-40b6-aeef-e887e2c7673e"
client = MagicMock()
adapter = OpenLineageAdapter(client)
@@ -341,3 +377,22 @@ def test_emit_dag_failed_event(uuid):
)
in client.emit.mock_calls
)
+
+ mock_stats_incr.assert_not_called()
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+
+
+@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter.get_or_create_openlineage_client")
+@patch("airflow.providers.openlineage.plugins.adapter.OpenLineageRedactor")
+@patch("airflow.providers.openlineage.plugins.adapter.Stats.timer")
+@patch("airflow.providers.openlineage.plugins.adapter.Stats.incr")
+def test_openlineage_adapter_stats_emit_failed(
+ mock_stats_incr, mock_stats_timer, mock_redact, mock_get_client
+):
+ adapter = OpenLineageAdapter()
+ mock_get_client.return_value.emit.side_effect = Exception()
+
+ adapter.emit(MagicMock())
+
+ mock_stats_timer.assert_called_with("ol.emit.attempts")
+ mock_stats_incr.assert_has_calls([mock.call("ol.emit.failed")])