josh-fell commented on code in PR #36177:
URL: https://github.com/apache/airflow/pull/36177#discussion_r1433447016


##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -396,10 +400,25 @@ def batch_data(
             .. seealso:: `batch_config_params options 
<https://weaviate-python-client.readthedocs.io/en/v3.25.3/weaviate.batch.html#weaviate.batch.Batch.configure>`__
         :param vector_col: name of the column containing the vector.
         :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        :param tenant: The tenant to which the object will be added.
+        :param uuid_col: Name of the column containing the UUID.
+        :param insertion_errors: list to hold errors while inserting.
         """
         client = self.conn
         if not batch_config_params:
             batch_config_params = {}
+
+        # configuration for context manager for __exit__ method to callback on 
errors for weaviate
+        # batch ingestion.
+        if not batch_config_params.get("callback"):
+            batch_config_params.update({"callback": 
partial(self.process_batch_errors, insertion_errors)})
+
+        if not batch_config_params.get("timeout_retries"):
+            batch_config_params.update({"timeout_retries": 5})
+
+        if not batch_config_params.get("connection_error_retries"):
+            batch_config_params.update({"connection_error_retries": 5})

Review Comment:
   _Maybe_ an optimization and readability suggestion, but why not assign the 
value directly?
   ```suggestion
           if not batch_config_params.get("callback"):
               batch_config_params["callback"] = 
partial(self.process_batch_errors, insertion_errors)
   
           if not batch_config_params.get("timeout_retries"):
               batch_config_params["timeout_retries"] = 5
   
           if not batch_config_params.get("connection_error_retries"):
               batch_config_params["connection_error_retries"] = 5
   ```



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +636,303 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _delete_objects(self, uuids: Collection, class_name: str, 
retry_attempts_per_object: int = 5):
+        """
+        Helper function for `create_or_replace_objects()` to delete multiple 
objects.
+
+        :param uuids: Collection of uuids.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        """
+        for uuid in uuids:
+            for attempt in Retrying(
+                stop=stop_after_attempt(retry_attempts_per_object),
+                retry=(
+                    retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                    | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                ),
+            ):
+                with attempt:
+                    try:
+                        self.delete_object(uuid=uuid, class_name=class_name)
+                        self.log.debug("Deleted object with uuid %s", uuid)
+                    except weaviate.exceptions.UnexpectedStatusCodeException 
as e:
+                        if e.status_code == 404:
+                            self.log.debug("Tried to delete a non existent 
object with uuid %s", uuid)
+                        else:
+                            self.log.debug("Error occurred while trying to 
delete object with uuid %s", uuid)
+                            raise e
+
+        self.log.info("Deleted %s objects.", len(uuids))
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_documents(
+        self, data: pd.DataFrame, document_column: str, class_name: str, 
uuid_column: str
+    ) -> tuple[set, set]:
+        """
+        Get all object uuids belonging to a document.
+
+        :param data: A single pandas DataFrame.
+        :param document_column: The name of the property to query.
+        :param class_name: The name of the class to query.
+        :param uuid_column: The name of the column containing the UUID.
+        """
+        offset = 0
+        limit = 2000
+        documents_to_uuid: dict = {}
+        existing_documents = set()
+        document_keys = set(data[document_column])
+        non_existing_documents = document_keys.copy()
+        while True:
+            data_objects = (
+                self.conn.query.get(properties=[document_column], 
class_name=class_name)
+                .with_additional([uuid_column])
+                .with_where(
+                    {
+                        "operator": "Or",
+                        "operands": [
+                            {"valueText": key, "path": document_column, 
"operator": "Equal"}
+                            for key in document_keys
+                        ],
+                    }
+                )
+                .with_offset(offset)
+                .with_limit(limit)
+                .do()["data"]["Get"][class_name]
+            )
+            if len(data_objects) == 0:
+                break
+            for data_object in data_objects:
+                document_url = data_object[document_column]
+
+                if document_url not in documents_to_uuid:
+                    documents_to_uuid[document_url] = set()
+                    existing_documents.add(document_url)
+                    non_existing_documents.remove(document_url)
+
+                
documents_to_uuid[document_url].add(data_object["_additional"][uuid_column])
+            offset = offset + limit
+        return existing_documents, non_existing_documents
+
+    def _delete_all_documents_objects(
+        self,
+        document_keys: list[str],
+        document_column: str,
+        class_name: str,
+        batch_delete_error: list | None = None,
+        tenant: str | None = None,
+        batch_config_params: dict[str, Any] | None = None,
+    ):
+        if not batch_config_params:
+            batch_config_params = {}
+
+        # configuration for context manager for __exit__ method to callback on 
errors for weaviate
+        # batch ingestion.
+        if not batch_config_params.get("callback"):
+            batch_config_params.update({"callback": 
partial(self.process_batch_errors, batch_delete_error)})
+
+        self.conn.batch.configure(**batch_config_params)
+
+        with self.conn.batch as batch:
+            batch.consistency_level = 
weaviate.data.replication.ConsistencyLevel.ALL
+            batch.delete_objects(
+                class_name=class_name,
+                # same where operator as in the GraphQL API
+                where={
+                    "operator": "Or",
+                    "operands": [
+                        {
+                            "path": [document_column],
+                            "operator": "Equal",
+                            "valueText": key,
+                        }
+                        for key in document_keys
+                    ],
+                },
+                output="verbose",
+                dry_run=False,
+                tenant=tenant,
+            )
+        return batch_delete_error
+
+    def process_batch_errors(self, batch_errors: list, results: list, verbose: 
bool = True) -> None:
+        """
+        Processes the results from batch operation and collects any errors.
+
+        :param batch_errors: list to populate in case of error
+        :param results: Results from the batch operation.
+        :param verbose: Flag to enable verbose logging.
+        """
+        for item in results:
+            if "errors" in item["result"]:
+                item_error = {"uuid": item["id"], "errors": 
item["result"]["errors"]}
+                if verbose:
+                    self.log.info(
+                        f"Error occurred in batch process for {item['id']} 
with error {item['result']['errors']}"
+                    )
+                batch_errors.append(item_error)
+
+    def create_or_replace_document_objects(
+        self,
+        data: pd.DataFrame | list[dict[str, Any]],
+        class_name: str,
+        document_column: str,
+        existing: str = "skip",
+        uuid_column: str | None = None,
+        vector_column: str = "Vector",
+        batch_config_params: dict | None = None,
+        tenant: str | None = None,
+    ):
+        """
+        create or replace objects belonging to documents.
+
+        In real-world scenarios, information sources like Airflow docs, Stack 
Overflow, or other issues
+        are considered 'documents' here. It's crucial to keep the database 
objects in sync with these sources.
+        If any changes occur in these documents, this function aims to reflect 
those changes in the database.
+
+        Note: This function assumes responsibility for identifying changes in 
documents, dropping relevant
+        database objects, and recreating them based on updated information. 
It's crucial to handle this
+        process with care, ensuring backups and validation are in place to 
prevent data loss or
+         inconsistencies.

Review Comment:
   ```suggestion
           .. note::
               This function assumes responsibility for identifying changes in 
documents, dropping relevant
               database objects, and recreating them based on updated 
information. It's crucial to handle this
               process with care, ensuring backups and validation are in place 
to prevent data loss or
               inconsistencies.
   ```
   It might be worth using the native admonition here to make it more obvious 
in the API docs. WDYT?



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +636,303 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _delete_objects(self, uuids: Collection, class_name: str, 
retry_attempts_per_object: int = 5):
+        """
+        Helper function for `create_or_replace_objects()` to delete multiple 
objects.
+
+        :param uuids: Collection of uuids.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        """
+        for uuid in uuids:
+            for attempt in Retrying(
+                stop=stop_after_attempt(retry_attempts_per_object),
+                retry=(
+                    retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                    | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                ),
+            ):
+                with attempt:
+                    try:
+                        self.delete_object(uuid=uuid, class_name=class_name)
+                        self.log.debug("Deleted object with uuid %s", uuid)
+                    except weaviate.exceptions.UnexpectedStatusCodeException 
as e:
+                        if e.status_code == 404:
+                            self.log.debug("Tried to delete a non existent 
object with uuid %s", uuid)
+                        else:
+                            self.log.debug("Error occurred while trying to 
delete object with uuid %s", uuid)
+                            raise e
+
+        self.log.info("Deleted %s objects.", len(uuids))
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_documents(
+        self, data: pd.DataFrame, document_column: str, class_name: str, 
uuid_column: str
+    ) -> tuple[set, set]:
+        """
+        Get all object uuids belonging to a document.
+
+        :param data: A single pandas DataFrame.
+        :param document_column: The name of the property to query.
+        :param class_name: The name of the class to query.
+        :param uuid_column: The name of the column containing the UUID.
+        """
+        offset = 0
+        limit = 2000
+        documents_to_uuid: dict = {}
+        existing_documents = set()
+        document_keys = set(data[document_column])
+        non_existing_documents = document_keys.copy()
+        while True:
+            data_objects = (
+                self.conn.query.get(properties=[document_column], 
class_name=class_name)
+                .with_additional([uuid_column])
+                .with_where(
+                    {
+                        "operator": "Or",
+                        "operands": [
+                            {"valueText": key, "path": document_column, 
"operator": "Equal"}
+                            for key in document_keys
+                        ],
+                    }
+                )
+                .with_offset(offset)
+                .with_limit(limit)
+                .do()["data"]["Get"][class_name]
+            )
+            if len(data_objects) == 0:
+                break
+            for data_object in data_objects:
+                document_url = data_object[document_column]
+
+                if document_url not in documents_to_uuid:
+                    documents_to_uuid[document_url] = set()
+                    existing_documents.add(document_url)
+                    non_existing_documents.remove(document_url)
+
+                
documents_to_uuid[document_url].add(data_object["_additional"][uuid_column])
+            offset = offset + limit
+        return existing_documents, non_existing_documents
+
+    def _delete_all_documents_objects(
+        self,
+        document_keys: list[str],
+        document_column: str,
+        class_name: str,
+        batch_delete_error: list | None = None,
+        tenant: str | None = None,
+        batch_config_params: dict[str, Any] | None = None,
+    ):
+        if not batch_config_params:
+            batch_config_params = {}
+
+        # configuration for context manager for __exit__ method to callback on 
errors for weaviate
+        # batch ingestion.
+        if not batch_config_params.get("callback"):
+            batch_config_params.update({"callback": 
partial(self.process_batch_errors, batch_delete_error)})

Review Comment:
   Same here.



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +636,303 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _delete_objects(self, uuids: Collection, class_name: str, 
retry_attempts_per_object: int = 5):
+        """
+        Helper function for `create_or_replace_objects()` to delete multiple 
objects.
+
+        :param uuids: Collection of uuids.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.

Review Comment:
   ```suggestion
           :param retry_attempts_per_object: number of times to try in case of 
failure before giving up.
   ```



##########
airflow/providers/weaviate/hooks/weaviate.py:
##########
@@ -606,3 +636,303 @@ def object_exists(self, uuid: str | UUID, **kwargs) -> 
bool:
         """
         client = self.conn
         return client.data_object.exists(uuid, **kwargs)
+
+    def _delete_objects(self, uuids: Collection, class_name: str, 
retry_attempts_per_object: int = 5):
+        """
+        Helper function for `create_or_replace_objects()` to delete multiple 
objects.
+
+        :param uuids: Collection of uuids.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param retry_attempts_per_object: number of time to try in case of 
failure before giving up.
+        """
+        for uuid in uuids:
+            for attempt in Retrying(
+                stop=stop_after_attempt(retry_attempts_per_object),
+                retry=(
+                    retry_if_exception(lambda exc: 
check_http_error_is_retryable(exc))
+                    | retry_if_exception_type(REQUESTS_EXCEPTIONS_TYPES)
+                ),
+            ):
+                with attempt:
+                    try:
+                        self.delete_object(uuid=uuid, class_name=class_name)
+                        self.log.debug("Deleted object with uuid %s", uuid)
+                    except weaviate.exceptions.UnexpectedStatusCodeException 
as e:
+                        if e.status_code == 404:
+                            self.log.debug("Tried to delete a non existent 
object with uuid %s", uuid)
+                        else:
+                            self.log.debug("Error occurred while trying to 
delete object with uuid %s", uuid)
+                            raise e
+
+        self.log.info("Deleted %s objects.", len(uuids))
+
+    def _generate_uuids(
+        self,
+        df: pd.DataFrame,
+        class_name: str,
+        unique_columns: list[str],
+        vector_column: str | None = None,
+        uuid_column: str | None = None,
+    ) -> tuple[pd.DataFrame, str]:
+        """
+        Adds UUIDs to a DataFrame, useful for replace operations where UUIDs 
must be known before ingestion.
+
+        By default, UUIDs are generated using a custom function if 
'uuid_column' is not specified.
+        The function can potentially ingest the same data multiple times with 
different UUIDs.
+
+        :param df: A dataframe with data to generate a UUID from.
+        :param class_name: The name of the class use as part of the uuid 
namespace.
+        :param uuid_column: Name of the column to create. Default is 'id'.
+        :param unique_columns: A list of columns to use for UUID generation. 
By default, all columns except
+            vector_column will be used.
+        :param vector_column: Name of the column containing the vector data.  
If specified the vector will be
+            removed prior to generating the uuid.
+        """
+        column_names = df.columns.to_list()
+
+        difference_columns = 
set(unique_columns).difference(set(df.columns.to_list()))
+        if difference_columns:
+            raise ValueError(f"Columns {', '.join(difference_columns)} don't 
exist in dataframe")
+
+        if uuid_column is None:
+            self.log.info("No uuid_column provided. Generating UUIDs as column 
name `id`.")
+            if "id" in column_names:
+                raise ValueError(
+                    "Property 'id' already in dataset. Consider renaming or 
specify 'uuid_column'."
+                )
+            else:
+                uuid_column = "id"
+
+        if uuid_column in column_names:
+            raise ValueError(
+                f"Property {uuid_column} already in dataset. Consider renaming 
or specify a different"
+                f" 'uuid_column'."
+            )
+
+        df[uuid_column] = (
+            df[unique_columns]
+            .drop(columns=[vector_column], inplace=False, errors="ignore")
+            .apply(lambda row: generate_uuid5(identifier=row.to_dict(), 
namespace=class_name), axis=1)
+        )
+
+        return df, uuid_column
+
+    def _check_existing_documents(
+        self, data: pd.DataFrame, document_column: str, class_name: str, 
uuid_column: str
+    ) -> tuple[set, set]:
+        """
+        Get all object uuids belonging to a document.
+
+        :param data: A single pandas DataFrame.
+        :param document_column: The name of the property to query.
+        :param class_name: The name of the class to query.
+        :param uuid_column: The name of the column containing the UUID.
+        """
+        offset = 0
+        limit = 2000
+        documents_to_uuid: dict = {}
+        existing_documents = set()
+        document_keys = set(data[document_column])
+        non_existing_documents = document_keys.copy()
+        while True:
+            data_objects = (
+                self.conn.query.get(properties=[document_column], 
class_name=class_name)
+                .with_additional([uuid_column])
+                .with_where(
+                    {
+                        "operator": "Or",
+                        "operands": [
+                            {"valueText": key, "path": document_column, 
"operator": "Equal"}
+                            for key in document_keys
+                        ],
+                    }
+                )
+                .with_offset(offset)
+                .with_limit(limit)
+                .do()["data"]["Get"][class_name]
+            )
+            if len(data_objects) == 0:
+                break
+            for data_object in data_objects:
+                document_url = data_object[document_column]
+
+                if document_url not in documents_to_uuid:
+                    documents_to_uuid[document_url] = set()
+                    existing_documents.add(document_url)
+                    non_existing_documents.remove(document_url)
+
+                
documents_to_uuid[document_url].add(data_object["_additional"][uuid_column])
+            offset = offset + limit
+        return existing_documents, non_existing_documents
+
+    def _delete_all_documents_objects(
+        self,
+        document_keys: list[str],
+        document_column: str,
+        class_name: str,
+        batch_delete_error: list | None = None,
+        tenant: str | None = None,
+        batch_config_params: dict[str, Any] | None = None,
+    ):
+        if not batch_config_params:
+            batch_config_params = {}
+
+        # configuration for context manager for __exit__ method to callback on 
errors for weaviate
+        # batch ingestion.
+        if not batch_config_params.get("callback"):
+            batch_config_params.update({"callback": 
partial(self.process_batch_errors, batch_delete_error)})
+
+        self.conn.batch.configure(**batch_config_params)
+
+        with self.conn.batch as batch:
+            batch.consistency_level = 
weaviate.data.replication.ConsistencyLevel.ALL
+            batch.delete_objects(
+                class_name=class_name,
+                # same where operator as in the GraphQL API
+                where={
+                    "operator": "Or",
+                    "operands": [
+                        {
+                            "path": [document_column],
+                            "operator": "Equal",
+                            "valueText": key,
+                        }
+                        for key in document_keys
+                    ],
+                },
+                output="verbose",
+                dry_run=False,
+                tenant=tenant,
+            )
+        return batch_delete_error
+
+    def process_batch_errors(self, batch_errors: list, results: list, verbose: 
bool = True) -> None:
+        """
+        Processes the results from batch operation and collects any errors.
+
+        :param batch_errors: list to populate in case of error
+        :param results: Results from the batch operation.
+        :param verbose: Flag to enable verbose logging.
+        """
+        for item in results:
+            if "errors" in item["result"]:
+                item_error = {"uuid": item["id"], "errors": 
item["result"]["errors"]}
+                if verbose:
+                    self.log.info(
+                        f"Error occurred in batch process for {item['id']} 
with error {item['result']['errors']}"
+                    )
+                batch_errors.append(item_error)
+
+    def create_or_replace_document_objects(
+        self,
+        data: pd.DataFrame | list[dict[str, Any]],
+        class_name: str,
+        document_column: str,
+        existing: str = "skip",
+        uuid_column: str | None = None,
+        vector_column: str = "Vector",
+        batch_config_params: dict | None = None,
+        tenant: str | None = None,
+    ):
+        """
+        create or replace objects belonging to documents.
+
+        In real-world scenarios, information sources like Airflow docs, Stack 
Overflow, or other issues
+        are considered 'documents' here. It's crucial to keep the database 
objects in sync with these sources.
+        If any changes occur in these documents, this function aims to reflect 
those changes in the database.
+
+        Note: This function assumes responsibility for identifying changes in 
documents, dropping relevant
+        database objects, and recreating them based on updated information. 
It's crucial to handle this
+        process with care, ensuring backups and validation are in place to 
prevent data loss or
+         inconsistencies.
+
+        Provides users with multiple ways of dealing with existing values.
+            1. replace: replace the existing objects with new objects. This 
option requires to identify the
+             objects belonging to a document. which by default is done by 
using document_column field.
+            2. skip: skip the existing objects and only add the missing 
objects of a document.
+            3. error: raise an error if an object belonging to a existing 
document is tried to be created.
+
+        :param data: A single pandas DataFrame or a list of dicts to be 
ingested.
+        :param class_name: Name of the class in Weaviate schema where data is 
to be ingested.
+        :param existing: Strategy for handling existing data: 'skip', or 
'replace'. Default is 'skip'.
+        :param document_column: Column in DataFrame that identifying source 
document.
+        :param uuid_column: Column with pre-generated UUIDs. If not provided, 
UUIDs will be generated.
+        :param vector_column: Column with embedding vectors for pre-embedded 
data.
+        :param batch_config_params: Additional parameters for Weaviate batch 
configuration.
+        :param tenant: The tenant to which the object will be added.
+        :return: list of UUID which failed to create
+        """
+        import pandas as pd
+
+        if existing not in ["skip", "replace", "error"]:
+            raise ValueError("Invalid parameter for 'existing'. Choices are 
'skip', 'replace', 'error'.")
+
+        if isinstance(data, list):
+            data = pd.json_normalize(data)
+
+        unique_columns = sorted(data.columns.to_list())
+
+        self.log.info("Inserting %s objects.", data.shape[0])
+
+        if uuid_column is None or uuid_column not in data.columns:
+            (
+                data,
+                uuid_column,
+            ) = self._generate_uuids(
+                df=data,
+                class_name=class_name,
+                unique_columns=unique_columns,
+                vector_column=vector_column,
+                uuid_column=uuid_column,
+            )
+
+        # drop duplicate rows, using uuid_column and unique_columns. Removed  
`None` as it can be added to
+        # set when `uuid_column` is None.
+        data = data.drop_duplicates(subset=[document_column, uuid_column], 
keep="first")
+        batch_delete_error: list = []
+        existing_documents, non_existing_documents = 
self._check_existing_documents(
+            data=data,
+            document_column=document_column,
+            uuid_column=uuid_column,
+            class_name=class_name,
+        )
+        if existing == "error" and len(existing_documents):
+            raise ValueError(
+                f"Documents {', '.join(existing_documents)} already exists. 
You can either skip or replace"
+                f" them by passing 'existing=skip' or 'existing=replace' 
respectively."
+            )
+        elif existing == "skip":
+            data = data[data[document_column].isin(non_existing_documents)]
+        elif existing == "replace":
+            batch_delete_error = self._delete_all_documents_objects(
+                document_keys=list(existing_documents),
+                document_column=document_column,
+                class_name=class_name,
+                batch_delete_error=batch_delete_error,
+                tenant=tenant,
+                batch_config_params=batch_config_params,
+            )
+            data = 
data[data[document_column].isin(non_existing_documents.union(existing_documents))]
+
+        insertion_errors: list = []
+        if data.shape[0]:
+            self.log.info("Batch inserting %s objects.", data.shape[0])
+            insertion_errors = self.batch_data(
+                class_name=class_name,
+                data=data,
+                insertion_errors=insertion_errors,
+                batch_config_params=batch_config_params,
+                vector_col=vector_column,
+                uuid_col=uuid_column,
+                tenant=tenant,
+            )
+            if insertion_errors or batch_delete_error:
+                if insertion_errors:
+                    self.log.info("Failed to insert %s objects.", 
len(insertion_errors))
+                if batch_delete_error:
+                    self.log.info("Failed to delete %s objects.", 
len(insertion_errors))
+                # Rollback object that were not created properly
+                self._delete_objects(
+                    [item["uuid"] for item in insertion_errors + 
batch_delete_error], class_name=class_name

Review Comment:
   Could `batch_delete_error` ever be `None` or a non-list type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to