This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b2305e1eea6 Add `S3VectorsPutVectorsOperator` (#66545)
b2305e1eea6 is described below
commit b2305e1eea6b9dffb353712b79f7bb58c30f2e7a
Author: John Jackson <[email protected]>
AuthorDate: Mon May 11 07:03:20 2026 -0700
Add `S3VectorsPutVectorsOperator` (#66545)
---
providers/amazon/docs/operators/s3_vectors.rst | 14 +++++++
.../providers/amazon/aws/operators/s3_vectors.py | 49 ++++++++++++++++++++++
.../tests/system/amazon/aws/example_s3_vectors.py | 11 +++++
.../unit/amazon/aws/operators/test_s3_vectors.py | 28 +++++++++++++
4 files changed, 102 insertions(+)
diff --git a/providers/amazon/docs/operators/s3_vectors.rst
b/providers/amazon/docs/operators/s3_vectors.rst
index c645b4bc651..6e1d364377a 100644
--- a/providers/amazon/docs/operators/s3_vectors.rst
+++ b/providers/amazon/docs/operators/s3_vectors.rst
@@ -78,6 +78,20 @@ To delete an Amazon S3 Vectors vector bucket, use
:start-after: [START howto_operator_s3vectors_delete_vector_bucket]
:end-before: [END howto_operator_s3vectors_delete_vector_bucket]
+.. _howto/operator:S3VectorsPutVectorsOperator:
+
+Put Vectors
+-----------
+
+To insert vectors into an Amazon S3 Vectors index, use
+:class:`~airflow.providers.amazon.aws.operators.s3_vectors.S3VectorsPutVectorsOperator`.
+
+.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_s3_vectors.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_s3vectors_put_vectors]
+ :end-before: [END howto_operator_s3vectors_put_vectors]
+
Reference
---------
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
index c4d7acfba9d..1ca23e05bf2 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
@@ -249,3 +249,52 @@ class
S3VectorsDeleteIndexOperator(AwsBaseOperator[AwsBaseHook]):
self.log.info("Deleting index %s from vector bucket %s",
self.index_name, self.vector_bucket_name)
self.hook.conn.delete_index(vectorBucketName=self.vector_bucket_name,
indexName=self.index_name)
self.log.info("Deleted index %s", self.index_name)
+
+
+class S3VectorsPutVectorsOperator(AwsBaseOperator[AwsBaseHook]):
+ """
+ Insert vectors into an Amazon S3 Vectors index.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:S3VectorsPutVectorsOperator`
+
+ :param vector_bucket_name: The name of the vector bucket. (templated)
+ :param index_name: The name of the index. (templated)
+ :param vectors: List of vector dicts, each with ``key``, ``data``, and
optional ``metadata``. (templated)
+ """
+
+ aws_hook_class = AwsBaseHook
+ template_fields: tuple[str, ...] = (
+ *AwsBaseOperator.template_fields,
+ "vector_bucket_name",
+ "index_name",
+ "vectors",
+ )
+ template_fields_renderers = {"vectors": "json"}
+
+ def __init__(
+ self,
+ *,
+ vector_bucket_name: str,
+ index_name: str,
+ vectors: list[dict[str, Any]],
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.vector_bucket_name = vector_bucket_name
+ self.index_name = index_name
+ self.vectors = vectors
+
+ @property
+ def _hook_parameters(self) -> dict[str, Any]:
+ return {**super()._hook_parameters, "client_type": "s3vectors"}
+
+ def execute(self, context: Context) -> None:
+ self.log.info("Putting %d vectors into index %s", len(self.vectors),
self.index_name)
+ self.hook.conn.put_vectors(
+ vectorBucketName=self.vector_bucket_name,
+ indexName=self.index_name,
+ vectors=self.vectors,
+ )
+ self.log.info("Put %d vectors successfully", len(self.vectors))
diff --git a/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
b/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
index bf1ef435b31..9faedd9549e 100644
--- a/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
+++ b/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
@@ -23,6 +23,7 @@ from airflow.providers.amazon.aws.operators.s3_vectors import
(
S3VectorsCreateVectorBucketOperator,
S3VectorsDeleteIndexOperator,
S3VectorsDeleteVectorBucketOperator,
+ S3VectorsPutVectorsOperator,
)
from airflow.providers.common.compat.sdk import DAG, chain
@@ -68,6 +69,15 @@ with DAG(
)
# [END howto_operator_s3vectors_create_index]
+ # [START howto_operator_s3vectors_put_vectors]
+ put_vectors = S3VectorsPutVectorsOperator(
+ task_id="put_vectors",
+ vector_bucket_name=bucket_name,
+ index_name=index_name,
+ vectors=[{"key": "test-vec-1", "data": {"float32": [0.1, 0.2, 0.3,
0.4]}}],
+ )
+ # [END howto_operator_s3vectors_put_vectors]
+
# [START howto_operator_s3vectors_delete_vector_bucket]
delete_vector_bucket = S3VectorsDeleteVectorBucketOperator(
task_id="delete_vector_bucket",
@@ -89,6 +99,7 @@ with DAG(
test_context,
create_vector_bucket,
create_index,
+ put_vectors,
delete_index,
delete_vector_bucket,
)
diff --git
a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
index 2a3acb8bd90..e483d0396db 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
@@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.operators.s3_vectors import
(
S3VectorsCreateVectorBucketOperator,
S3VectorsDeleteIndexOperator,
S3VectorsDeleteVectorBucketOperator,
+ S3VectorsPutVectorsOperator,
)
from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -222,3 +223,30 @@ class TestS3VectorsDeleteIndexOperator:
def test_template_fields(self):
validate_template_fields(self.operator)
+
+
+class TestS3VectorsPutVectorsOperator:
+ VECTORS = [{"key": "vec1", "data": {"float32": [0.1, 0.2, 0.3]}}]
+
+ def setup_method(self):
+ self.operator = S3VectorsPutVectorsOperator(
+ task_id="put_vectors",
+ vector_bucket_name=BUCKET_NAME,
+ index_name=INDEX_NAME,
+ vectors=self.VECTORS,
+ )
+
+ def test_execute(self):
+ mock_conn = MagicMock()
+ self.operator.hook.conn = mock_conn
+
+ self.operator.execute({})
+
+ mock_conn.put_vectors.assert_called_once_with(
+ vectorBucketName=BUCKET_NAME,
+ indexName=INDEX_NAME,
+ vectors=self.VECTORS,
+ )
+
+ def test_template_fields(self):
+ validate_template_fields(self.operator)