This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3784cdf5ed init templated field explicitly in constructor (#36908)
3784cdf5ed is described below
commit 3784cdf5ed954bd356a3727988dd707cc5568a9c
Author: rom sharon <[email protected]>
AuthorDate: Sat Jan 20 11:28:27 2024 +0200
init templated field explicitly in constructor (#36908)
---
airflow/providers/weaviate/hooks/weaviate.py | 10 +++++-----
airflow/providers/weaviate/operators/weaviate.py | 7 +++----
2 files changed, 8 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/weaviate/hooks/weaviate.py
b/airflow/providers/weaviate/hooks/weaviate.py
index 470d9bf54f..ccca02d357 100644
--- a/airflow/providers/weaviate/hooks/weaviate.py
+++ b/airflow/providers/weaviate/hooks/weaviate.py
@@ -183,7 +183,7 @@ class WeaviateHook(BaseHook):
client.schema.create(schema_json)
@staticmethod
- def _convert_dataframe_to_list(data: list[dict[str, Any]] | pd.DataFrame)
-> list[dict[str, Any]]:
+ def _convert_dataframe_to_list(data: list[dict[str, Any]] | pd.DataFrame |
None) -> list[dict[str, Any]]:
"""Helper function to convert dataframe to list of dicts.
In scenario where Pandas isn't installed and we pass data as a list of
dictionaries, importing
@@ -382,7 +382,7 @@ class WeaviateHook(BaseHook):
def batch_data(
self,
class_name: str,
- data: list[dict[str, Any]] | pd.DataFrame,
+ data: list[dict[str, Any]] | pd.DataFrame | None,
batch_config_params: dict[str, Any] | None = None,
vector_col: str = "Vector",
uuid_col: str = "id",
@@ -401,7 +401,7 @@ class WeaviateHook(BaseHook):
:param retry_attempts_per_object: number of time to try in case of
failure before giving up.
:param tenant: The tenant to which the object will be added.
"""
- data = self._convert_dataframe_to_list(data)
+ converted_data = self._convert_dataframe_to_list(data)
total_results = 0
error_results = 0
insertion_errors: list = []
@@ -437,7 +437,7 @@ class WeaviateHook(BaseHook):
self.log.info(
"Total Objects %s / Objects %s successfully inserted and
Objects %s had errors.",
- len(data),
+ len(converted_data),
total_results,
error_results,
)
@@ -460,7 +460,7 @@ class WeaviateHook(BaseHook):
client.batch.configure(**batch_config_params)
with client.batch as batch:
# Batch import all data
- for index, data_obj in enumerate(data):
+ for index, data_obj in enumerate(converted_data):
for attempt in Retrying(
stop=stop_after_attempt(retry_attempts_per_object),
retry=(
diff --git a/airflow/providers/weaviate/operators/weaviate.py
b/airflow/providers/weaviate/operators/weaviate.py
index d4dadf261c..586caaa993 100644
--- a/airflow/providers/weaviate/operators/weaviate.py
+++ b/airflow/providers/weaviate/operators/weaviate.py
@@ -74,9 +74,8 @@ class WeaviateIngestOperator(BaseOperator):
self.input_json = input_json
self.uuid_column = uuid_column
self.tenant = tenant
- if input_data is not None:
- self.input_data = input_data
- elif input_json is not None:
+ self.input_data = input_data
+ if (self.input_data is None) and (input_json is not None):
warnings.warn(
"Passing 'input_json' to WeaviateIngestOperator is deprecated
and"
" you should use 'input_data' instead",
@@ -84,7 +83,7 @@ class WeaviateIngestOperator(BaseOperator):
stacklevel=2,
)
self.input_data = input_json
- else:
+ elif self.input_data is None and input_json is None:
raise TypeError("Either input_json or input_data is required")
@cached_property