This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 647e76716e Add distinct function to MongoHook in
apache-airflow-providers-mongo (#34466)
647e76716e is described below
commit 647e76716e38e07b71c8b7dedc4d3955aff110fb
Author: Octávio Lage <[email protected]>
AuthorDate: Wed Oct 25 13:52:01 2023 -0300
Add distinct function to MongoHook in apache-airflow-providers-mongo
(#34466)
---
airflow/providers/mongo/hooks/mongo.py | 24 ++++++++++++++++++++++++
tests/providers/mongo/hooks/test_mongo.py | 26 ++++++++++++++++++++++++++
2 files changed, 50 insertions(+)
diff --git a/airflow/providers/mongo/hooks/mongo.py
b/airflow/providers/mongo/hooks/mongo.py
index fca855a51d..928618a9dc 100644
--- a/airflow/providers/mongo/hooks/mongo.py
+++ b/airflow/providers/mongo/hooks/mongo.py
@@ -366,3 +366,27 @@ class MongoHook(BaseHook):
collection = self.get_collection(mongo_collection, mongo_db=mongo_db)
return collection.delete_many(filter_doc, **kwargs)
+
+ def distinct(
+ self,
+ mongo_collection: str,
+ distinct_key: str,
+ filter_doc: dict | None = None,
+ mongo_db: str | None = None,
+ **kwargs,
+ ) -> list[Any]:
+ """
+ Returns a list of distinct values for the given key across a
collection.
+
+
https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct
+
+ :param mongo_collection: The name of the collection to perform
distinct on.
+ :param distinct_key: The field to return distinct values from.
+ :param filter_doc: A query that matches the documents get distinct
values from.
+ Can be omitted; then will cover the entire collection.
+ :param mongo_db: The name of the database to use.
+ Can be omitted; then the database from the connection string is
used.
+ """
+ collection = self.get_collection(mongo_collection, mongo_db=mongo_db)
+
+ return collection.distinct(distinct_key, filter=filter_doc, **kwargs)
diff --git a/tests/providers/mongo/hooks/test_mongo.py
b/tests/providers/mongo/hooks/test_mongo.py
index 4d46a613e7..19aa4928fc 100644
--- a/tests/providers/mongo/hooks/test_mongo.py
+++ b/tests/providers/mongo/hooks/test_mongo.py
@@ -303,6 +303,32 @@ class TestMongoHook:
results = self.hook.aggregate(collection, aggregate_query)
assert len(list(results)) == 2
+ def test_distinct(self):
+ collection = mongomock.MongoClient().db.collection
+ objs = [
+ {"test_id": "1", "test_status": "success"},
+ {"test_id": "2", "test_status": "failure"},
+ {"test_id": "3", "test_status": "success"},
+ ]
+
+ collection.insert_many(objs)
+
+ results = self.hook.distinct(collection, "test_status")
+ assert len(results) == 2
+
+ def test_distinct_with_filter(self):
+ collection = mongomock.MongoClient().db.collection
+ objs = [
+ {"test_id": "1", "test_status": "success"},
+ {"test_id": "2", "test_status": "failure"},
+ {"test_id": "3", "test_status": "success"},
+ ]
+
+ collection.insert_many(objs)
+
+ results = self.hook.distinct(collection, "test_id", {"test_status":
"failure"})
+ assert len(results) == 1
+
def test_context_manager():
with MongoHook(conn_id="mongo_default", mongo_db="default") as ctx_hook: