pankajkoti commented on code in PR #35934:
URL: https://github.com/apache/airflow/pull/35934#discussion_r1417346501
##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -200,3 +209,147 @@ def query_without_vector(
.do()
)
return results
+
+ def create_object(
+ self, data_object: dict | str, class_name: str, **kwargs
+ ) -> str | dict[str, Any] | None:
+ """Create a new object.
+
+ :param data_object: Object to be added. If type is str it should be
either a URL or a file.
+ :param class_name: Class name associated with the object given.
+ :param kwargs: Additional parameters to be passed to
weaviate_client.data_object.create()
+ """
+ client = self.conn
+ # generate deterministic uuid if not provided
+ uuid = kwargs.pop("uuid", generate_uuid5(data_object))
+ try:
+ return client.data_object.create(data_object, class_name,
uuid=uuid, **kwargs)
+ except ObjectAlreadyExistsException:
+ self.log.warning("Object with the UUID %s already exists", uuid)
+ return None
+
+ def get_or_create_object(
+ self,
+ data_object: dict | str | None = None,
+ class_name: str | None = None,
+ vector: Sequence | None = None,
+ consistency_level: ConsistencyLevel | None = None,
+ tenant: str | None = None,
+ **kwargs,
+ ) -> str | dict[str, Any] | None:
+ """Get or Create a new object.
+
+ Returns the object if already exists
+
+ :param data_object: Object to be added. If type is str it should be
either a URL or a file. This is required
+ to create a new object.
+ :param class_name: Class name associated with the object given. This
is required to create a new object.
+ :param vector: Vector associated with the object given. This argument
is only used when creating object.
+ :param consistency_level: Consistency level to be used. Applies to
both create and get operations.
+ :tenant: Tenant to be used. Applies to both create and get operations.
+ :param kwargs: Additional parameters to be passed to
weaviate_client.data_object.create() and
+ weaviate_client.data_object.get()
+ """
+ obj = self.get_object(
+ class_name=class_name, consistency_level=consistency_level,
tenant=tenant, **kwargs
+ )
+ if not obj:
+ if not (data_object and class_name):
+ raise ValueError("data_object and class_name are required to
create a new object")
+ uuid = kwargs.pop("uuid", generate_uuid5(data_object))
+ return self.create_object(
+ data_object,
+ class_name,
+ vector=vector,
+ uuid=uuid,
+ consistency_level=consistency_level,
+ tenant=tenant,
+ )
+ return obj
+
+ def get_object(self, **kwargs) -> dict[str, Any] | None:
+ """Get objects or an object from weaviate.
+
+ :param kwargs: parameters to be passed to
weaviate_client.data_object.get() or
+ weaviate_client.data_object.get_by_id()
+ """
+ client = self.conn
+ return client.data_object.get(**kwargs)
+
+ def get_all_objects(
Review Comment:
Do we need to expose such a method? I am worried if there are millions of
objects it may cause OOM or other errors. It would be nice to return pages of
responses like 100 objects at a time or so. But returning all the objects could
be expensive. I would prefer to remove such an expose or if really needed, then
returned paged responses.
##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -200,3 +209,147 @@ def query_without_vector(
.do()
)
return results
+
+ def create_object(
+ self, data_object: dict | str, class_name: str, **kwargs
+ ) -> str | dict[str, Any] | None:
+ """Create a new object.
+
+ :param data_object: Object to be added. If type is str it should be
either a URL or a file.
+ :param class_name: Class name associated with the object given.
+ :param kwargs: Additional parameters to be passed to
weaviate_client.data_object.create()
+ """
+ client = self.conn
+ # generate deterministic uuid if not provided
+ uuid = kwargs.pop("uuid", generate_uuid5(data_object))
+ try:
+ return client.data_object.create(data_object, class_name,
uuid=uuid, **kwargs)
+ except ObjectAlreadyExistsException:
+ self.log.warning("Object with the UUID %s already exists", uuid)
+ return None
+
+ def get_or_create_object(
+ self,
+ data_object: dict | str | None = None,
+ class_name: str | None = None,
+ vector: Sequence | None = None,
+ consistency_level: ConsistencyLevel | None = None,
+ tenant: str | None = None,
+ **kwargs,
+ ) -> str | dict[str, Any] | None:
+ """Get or Create a new object.
+
+ Returns the object if already exists
+
+ :param data_object: Object to be added. If type is str it should be
either a URL or a file. This is required
+ to create a new object.
+ :param class_name: Class name associated with the object given. This
is required to create a new object.
+ :param vector: Vector associated with the object given. This argument
is only used when creating object.
+ :param consistency_level: Consistency level to be used. Applies to
both create and get operations.
+ :tenant: Tenant to be used. Applies to both create and get operations.
+ :param kwargs: Additional parameters to be passed to
weaviate_client.data_object.create() and
+ weaviate_client.data_object.get()
+ """
+ obj = self.get_object(
+ class_name=class_name, consistency_level=consistency_level,
tenant=tenant, **kwargs
+ )
+ if not obj:
+ if not (data_object and class_name):
+ raise ValueError("data_object and class_name are required to
create a new object")
+ uuid = kwargs.pop("uuid", generate_uuid5(data_object))
+ return self.create_object(
+ data_object,
+ class_name,
+ vector=vector,
+ uuid=uuid,
+ consistency_level=consistency_level,
+ tenant=tenant,
+ )
+ return obj
Review Comment:
```suggestion
if obj:
return obj
if not (data_object and class_name):
raise ValueError("data_object and class_name are required to
create a new object")
uuid = kwargs.pop("uuid", generate_uuid5(data_object))
return self.create_object(
data_object,
class_name,
vector=vector,
uuid=uuid,
consistency_level=consistency_level,
tenant=tenant,
)
```
Suggestion to avoid additional branching
##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -200,3 +209,147 @@ def query_without_vector(
.do()
)
return results
+
+ def create_object(
+ self, data_object: dict | str, class_name: str, **kwargs
+ ) -> str | dict[str, Any] | None:
+ """Create a new object.
+
+ :param data_object: Object to be added. If type is str it should be
either a URL or a file.
+ :param class_name: Class name associated with the object given.
+ :param kwargs: Additional parameters to be passed to
weaviate_client.data_object.create()
+ """
+ client = self.conn
+ # generate deterministic uuid if not provided
+ uuid = kwargs.pop("uuid", generate_uuid5(data_object))
+ try:
+ return client.data_object.create(data_object, class_name,
uuid=uuid, **kwargs)
+ except ObjectAlreadyExistsException:
+ self.log.warning("Object with the UUID %s already exists", uuid)
+ return None
+
+ def get_or_create_object(
+ self,
+ data_object: dict | str | None = None,
+ class_name: str | None = None,
+ vector: Sequence | None = None,
+ consistency_level: ConsistencyLevel | None = None,
+ tenant: str | None = None,
+ **kwargs,
+ ) -> str | dict[str, Any] | None:
+ """Get or Create a new object.
+
+ Returns the object if already exists
+
+ :param data_object: Object to be added. If type is str it should be
either a URL or a file. This is required
+ to create a new object.
+ :param class_name: Class name associated with the object given. This
is required to create a new object.
+ :param vector: Vector associated with the object given. This argument
is only used when creating object.
+ :param consistency_level: Consistency level to be used. Applies to
both create and get operations.
+ :tenant: Tenant to be used. Applies to both create and get operations.
+ :param kwargs: Additional parameters to be passed to
weaviate_client.data_object.create() and
+ weaviate_client.data_object.get()
+ """
+ obj = self.get_object(
+ class_name=class_name, consistency_level=consistency_level,
tenant=tenant, **kwargs
+ )
+ if not obj:
+ if not (data_object and class_name):
+ raise ValueError("data_object and class_name are required to
create a new object")
+ uuid = kwargs.pop("uuid", generate_uuid5(data_object))
+ return self.create_object(
+ data_object,
+ class_name,
+ vector=vector,
+ uuid=uuid,
+ consistency_level=consistency_level,
+ tenant=tenant,
+ )
+ return obj
+
+ def get_object(self, **kwargs) -> dict[str, Any] | None:
+ """Get objects or an object from weaviate.
+
+ :param kwargs: parameters to be passed to
weaviate_client.data_object.get() or
+ weaviate_client.data_object.get_by_id()
+ """
+ client = self.conn
+ return client.data_object.get(**kwargs)
+
+ def get_all_objects(
+ self, after: str | UUID | None = None, as_dataframe: bool = False,
**kwargs
+ ) -> list[dict[str, Any]] | pd.DataFrame:
+ """Get all objects from weaviate.
+
+ if after is provided, it will be used as the starting point for the
listing.
+
+ :param after: uuid of the object to start listing from
+ :param as_dataframe: if True, returns a pandas dataframe
+ :param kwargs: parameters to be passed to
weaviate_client.data_object.get()
+ """
+ all_objects = []
+ after = kwargs.pop("after", after)
+ while True:
+ results = self.get_object(after=after, **kwargs) or {}
+ if not results.get("objects"):
+ break
+ all_objects.extend(results["objects"])
+ after = results["objects"][-1]["id"]
+ if as_dataframe:
+ import pandas
+
+ return pandas.DataFrame(all_objects)
+ return all_objects
+
+ def delete_object(self, uuid: UUID | str, **kwargs) -> None:
+ """Delete an object from weaviate.
+
+ :param uuid: uuid of the object to be deleted
+ :param kwargs: Optional parameters to be passed to
weaviate_client.data_object.delete()
+ """
+ client = self.conn
+ client.data_object.delete(uuid, **kwargs)
+
+ def update_object(self, data_object: dict | str, class_name: str, uuid:
UUID | str, **kwargs) -> None:
+ """Update an object in weaviate.
+
+ :param data_object: The object states the fields that should be
updated. Fields not specified in the
+ 'data_object' remain unchanged. Fields that are None will not be
changed.
+ If type is str it should be either an URL or a file.
+ :param class_name: Class name associated with the object given.
+ :param uuid: uuid of the object to be updated
+ :param kwargs: Optional parameters to be passed to
weaviate_client.data_object.update()
+ """
+ client = self.conn
+ client.data_object.update(data_object, class_name, uuid, **kwargs)
+
+ def replace_object(self, data_object: dict | str, class_name: str, uuid:
UUID | str, **kwargs) -> None:
+ """Replace an object in weaviate.
+
+ :param data_object: The object states the fields that should be
updated. Fields not specified in the
+ 'data_object' will be set to None. If type is str it should be
either an URL or a file.
+ :param class_name: Class name associated with the object given.
+ :param uuid: uuid of the object to be replaced
+ :param kwargs: Optional parameters to be passed to
weaviate_client.data_object.replace()
+ """
+ client = self.conn
+ client.data_object.replace(data_object, class_name, uuid, **kwargs)
+
+ def validate_object(self, data_object: dict | str, class_name: str,
**kwargs):
+ """Validate an object in weaviate.
Review Comment:
Can we elaborate more on what does validation mean here and the purpose of
this method? Is it that this method would be used to validate a data object
before actually inserting/updating into the db?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]