This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2305e1eea6 Add `S3VectorsPutVectorsOperator` (#66545)
b2305e1eea6 is described below

commit b2305e1eea6b9dffb353712b79f7bb58c30f2e7a
Author: John Jackson <[email protected]>
AuthorDate: Mon May 11 07:03:20 2026 -0700

    Add `S3VectorsPutVectorsOperator` (#66545)
---
 providers/amazon/docs/operators/s3_vectors.rst     | 14 +++++++
 .../providers/amazon/aws/operators/s3_vectors.py   | 49 ++++++++++++++++++++++
 .../tests/system/amazon/aws/example_s3_vectors.py  | 11 +++++
 .../unit/amazon/aws/operators/test_s3_vectors.py   | 28 +++++++++++++
 4 files changed, 102 insertions(+)

diff --git a/providers/amazon/docs/operators/s3_vectors.rst 
b/providers/amazon/docs/operators/s3_vectors.rst
index c645b4bc651..6e1d364377a 100644
--- a/providers/amazon/docs/operators/s3_vectors.rst
+++ b/providers/amazon/docs/operators/s3_vectors.rst
@@ -78,6 +78,20 @@ To delete an Amazon S3 Vectors vector bucket, use
     :start-after: [START howto_operator_s3vectors_delete_vector_bucket]
     :end-before: [END howto_operator_s3vectors_delete_vector_bucket]
 
+.. _howto/operator:S3VectorsPutVectorsOperator:
+
+Put Vectors
+-----------
+
+To insert vectors into an Amazon S3 Vectors index, use
+:class:`~airflow.providers.amazon.aws.operators.s3_vectors.S3VectorsPutVectorsOperator`.
+
+.. exampleinclude:: /../../amazon/tests/system/amazon/aws/example_s3_vectors.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_s3vectors_put_vectors]
+    :end-before: [END howto_operator_s3vectors_put_vectors]
+
 Reference
 ---------
 
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
index c4d7acfba9d..1ca23e05bf2 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/s3_vectors.py
@@ -249,3 +249,52 @@ class 
S3VectorsDeleteIndexOperator(AwsBaseOperator[AwsBaseHook]):
         self.log.info("Deleting index %s from vector bucket %s", 
self.index_name, self.vector_bucket_name)
         self.hook.conn.delete_index(vectorBucketName=self.vector_bucket_name, 
indexName=self.index_name)
         self.log.info("Deleted index %s", self.index_name)
+
+
+class S3VectorsPutVectorsOperator(AwsBaseOperator[AwsBaseHook]):
+    """
+    Insert vectors into an Amazon S3 Vectors index.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:S3VectorsPutVectorsOperator`
+
+    :param vector_bucket_name: The name of the vector bucket. (templated)
+    :param index_name: The name of the index. (templated)
+    :param vectors: List of vector dicts, each with ``key``, ``data``, and 
optional ``metadata``. (templated)
+    """
+
+    aws_hook_class = AwsBaseHook
+    template_fields: tuple[str, ...] = (
+        *AwsBaseOperator.template_fields,
+        "vector_bucket_name",
+        "index_name",
+        "vectors",
+    )
+    template_fields_renderers = {"vectors": "json"}
+
+    def __init__(
+        self,
+        *,
+        vector_bucket_name: str,
+        index_name: str,
+        vectors: list[dict[str, Any]],
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.vector_bucket_name = vector_bucket_name
+        self.index_name = index_name
+        self.vectors = vectors
+
+    @property
+    def _hook_parameters(self) -> dict[str, Any]:
+        return {**super()._hook_parameters, "client_type": "s3vectors"}
+
+    def execute(self, context: Context) -> None:
+        self.log.info("Putting %d vectors into index %s", len(self.vectors), 
self.index_name)
+        self.hook.conn.put_vectors(
+            vectorBucketName=self.vector_bucket_name,
+            indexName=self.index_name,
+            vectors=self.vectors,
+        )
+        self.log.info("Put %d vectors successfully", len(self.vectors))
diff --git a/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py 
b/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
index bf1ef435b31..9faedd9549e 100644
--- a/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
+++ b/providers/amazon/tests/system/amazon/aws/example_s3_vectors.py
@@ -23,6 +23,7 @@ from airflow.providers.amazon.aws.operators.s3_vectors import 
(
     S3VectorsCreateVectorBucketOperator,
     S3VectorsDeleteIndexOperator,
     S3VectorsDeleteVectorBucketOperator,
+    S3VectorsPutVectorsOperator,
 )
 from airflow.providers.common.compat.sdk import DAG, chain
 
@@ -68,6 +69,15 @@ with DAG(
     )
     # [END howto_operator_s3vectors_create_index]
 
+    # [START howto_operator_s3vectors_put_vectors]
+    put_vectors = S3VectorsPutVectorsOperator(
+        task_id="put_vectors",
+        vector_bucket_name=bucket_name,
+        index_name=index_name,
+        vectors=[{"key": "test-vec-1", "data": {"float32": [0.1, 0.2, 0.3, 
0.4]}}],
+    )
+    # [END howto_operator_s3vectors_put_vectors]
+
     # [START howto_operator_s3vectors_delete_vector_bucket]
     delete_vector_bucket = S3VectorsDeleteVectorBucketOperator(
         task_id="delete_vector_bucket",
@@ -89,6 +99,7 @@ with DAG(
         test_context,
         create_vector_bucket,
         create_index,
+        put_vectors,
         delete_index,
         delete_vector_bucket,
     )
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
index 2a3acb8bd90..e483d0396db 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_s3_vectors.py
@@ -26,6 +26,7 @@ from airflow.providers.amazon.aws.operators.s3_vectors import 
(
     S3VectorsCreateVectorBucketOperator,
     S3VectorsDeleteIndexOperator,
     S3VectorsDeleteVectorBucketOperator,
+    S3VectorsPutVectorsOperator,
 )
 
 from unit.amazon.aws.utils.test_template_fields import validate_template_fields
@@ -222,3 +223,30 @@ class TestS3VectorsDeleteIndexOperator:
 
     def test_template_fields(self):
         validate_template_fields(self.operator)
+
+
+class TestS3VectorsPutVectorsOperator:
+    VECTORS = [{"key": "vec1", "data": {"float32": [0.1, 0.2, 0.3]}}]
+
+    def setup_method(self):
+        self.operator = S3VectorsPutVectorsOperator(
+            task_id="put_vectors",
+            vector_bucket_name=BUCKET_NAME,
+            index_name=INDEX_NAME,
+            vectors=self.VECTORS,
+        )
+
+    def test_execute(self):
+        mock_conn = MagicMock()
+        self.operator.hook.conn = mock_conn
+
+        self.operator.execute({})
+
+        mock_conn.put_vectors.assert_called_once_with(
+            vectorBucketName=BUCKET_NAME,
+            indexName=INDEX_NAME,
+            vectors=self.VECTORS,
+        )
+
+    def test_template_fields(self):
+        validate_template_fields(self.operator)

Reply via email to