sjyangkevin commented on code in PR #50735:
URL: https://github.com/apache/airflow/pull/50735#discussion_r2099206936
##########
providers/weaviate/src/airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -191,6 +192,70 @@ def get_collection(self, name: str) -> Collection:
client = self.conn
return client.collections.get(name)
+ def delete_by_property(
+ self,
+ *,
+ collection_names: list[str] | str,
+ filter_criteria: _Filters,
+ if_error: str = "stop",
+ dry_run: bool = False,
+ verbose: bool = False,
+ ) -> list[str] | None:
+ """
+ Delete objects in collections using a provided Filter object.
+
+ :param collection_names: The name(s) of the collection(s) to delete
from.
+ :param filter_criteria: A `Filter` object defining the filter criteria
for deletion.
+ :param if_error: define the actions to be taken if there is an error
while deleting objects, possible
+ options are `stop` and `continue`
+ :param dry_run: Use 'dry_run' to check how many objects would be
deleted, without actually performing the deletion.
+ :param verbose: Set output to 'verbose' to see more details (ID and
deletion status) for each deletion
+ :return: If `if_error="continue"`, returns list of failed collection
names. Else, returns None.
+
+ Example:
+ >>> from weaviate.classes.query import Filter
+ >>> my_filter = (
+ >>> Filter.by_property("round").equal("Double Jeopardy!") &
+ >>> Filter.by_property("points").less_than(600)
+ >>> )
+ >>> delete_by_filter(
+ >>> collection_names=["collection_a", "collection_b"],
+ >>> filter_criteria=my_filter,
+ >>> if_error="stop"
+ >>> )
+ """
+ collection_names = [collection_names] if isinstance(collection_names,
str) else collection_names
+
+ failed_collection_list = []
+ for collection_name in collection_names:
+ try:
+ self.log.info("Attempting to delete objects from '%s'",
collection_name)
+
+ for attempt in Retrying(
+ stop=stop_after_attempt(3),
+ retry=(
+ retry_if_exception(lambda exc:
check_http_error_is_retryable(exc))
+ | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+ ),
+ ):
+ with attempt:
+ self.log.info(attempt)
+ collection = self.get_collection(collection_name)
+ collection.data.delete_many(where=filter_criteria,
verbose=verbose, dry_run=dry_run)
+ except (
+ weaviate.exceptions.UnexpectedStatusCodeException,
+ weaviate.exceptions.WeaviateDeleteManyError,
+ ) as e:
+ if if_error == "continue":
+ self.log.error(e)
+ failed_collection_list.append(collection_name)
+ elif if_error == "stop":
+ raise e
Review Comment:
I am facing a trade off here. I can define a function, just wondering if
that is necessary, or there is a better way to keep code DRY.
### Option 1
```python
except (
weaviate.exceptions.UnexpectedStatusCodeException,
weaviate.exceptions.WeaviateDeleteManyError,
Exception # capture generic exception, but the above two can also be
captured by this, keeping for better readability
) as e:
if if_error == "continue":
self.log.error(e)
failed_collection_list.append(collection_name)
else:
raise e
```
### Option 2
```python
except (
weaviate.exceptions.UnexpectedStatusCodeException,
weaviate.exceptions.WeaviateDeleteManyError
) as e:
if if_error == "continue":
self.log.error(e)
failed_collection_list.append(collection_name)
else:
raise e
except Exception as e: # use another except block, but code don't follow DRY
if if_error == "continue":
self.log.error(e)
failed_collection_list.append(collection_name)
else:
raise e
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]