pankajastro commented on code in PR #35502:
URL: https://github.com/apache/airflow/pull/35502#discussion_r1384848396
##########
airflow/providers/pinecone/hooks/pinecone.py:
##########
@@ -126,3 +133,212 @@ def upsert(
show_progress=show_progress,
**kwargs,
)
+
+ @staticmethod
+ def create_index(
+ index_name: str,
+ dimension: int,
+ timeout: int | None = None,
+ index_type: str | None = "approximated",
+ metric: str | None = "cosine",
+ replicas: int | None = 1,
+ shards: int | None = 1,
+ pods: int | None = 1,
+ pod_type: str | None = "p1",
+ index_config: dict[str, str] | None = None,
+ metadata_config: dict[str, str] | None = None,
+ source_collection: str | None = "",
Review Comment:
```suggestion
index_type: str | None = "approximated",
metric: str | None = "cosine",
replicas: int | None = 1,
shards: int | None = 1,
pods: int | None = 1,
pod_type: str | None = "p1",
index_config: dict[str, str] | None = None,
metadata_config: dict[str, str] | None = None,
source_collection: str | None = "",
timeout: int | None = None,
```
nit to keep order same as docstring
##########
airflow/providers/pinecone/hooks/pinecone.py:
##########
@@ -126,3 +133,212 @@ def upsert(
show_progress=show_progress,
**kwargs,
)
+
+ @staticmethod
+ def create_index(
+ index_name: str,
+ dimension: int,
+ timeout: int | None = None,
+ index_type: str | None = "approximated",
+ metric: str | None = "cosine",
+ replicas: int | None = 1,
+ shards: int | None = 1,
+ pods: int | None = 1,
+ pod_type: str | None = "p1",
+ index_config: dict[str, str] | None = None,
+ metadata_config: dict[str, str] | None = None,
+ source_collection: str | None = "",
+ ) -> None:
+ """
+ Create a new index.
+
+ .. seealso:: https://docs.pinecone.io/reference/create_index/
+
+ :param index_name: The name of the index to create.
+ :param dimension: the dimension of vectors that would be inserted in
the index
+ :param index_type: type of index, one of {"approximated", "exact"},
defaults to "approximated".
+ :param metric: type of metric used in the vector index, one of
{"cosine", "dotproduct", "euclidean"}
+ :param replicas: the number of replicas, defaults to 1.
+ :param shards: the number of shards per index, defaults to 1.
+ :param pods: Total number of pods to be used by the index. pods =
shard*replicas
+ :param pod_type: the pod type to be used for the index. can be one of
p1 or s1.
+ :param index_config: Advanced configuration options for the index
+ :param metadata_config: Configuration related to the metadata index
+ :param source_collection: Collection name to create the index from
+ :param timeout: Timeout for wait until index gets ready.
+ """
+ pinecone.create_index(
+ name=index_name,
+ timeout=timeout,
+ index_type=index_type,
+ dimension=dimension,
+ metric=metric,
+ pods=pods,
+ replicas=replicas,
+ shards=shards,
+ pod_type=pod_type,
+ metadata_config=metadata_config,
+ source_collection=source_collection,
+ index_config=index_config,
+ )
+
+ @staticmethod
+ def describe_index(index_name: str) -> Any:
+ """
+ Retrieve information about a specific index.
+
+ :param index_name: The name of the index to describe.
+ """
+ return pinecone.describe_index(name=index_name)
+
+ @staticmethod
+ def delete_index(index_name: str, timeout: int | None = None) -> None:
+ """
+ Delete a specific index.
+
+ :param index_name: the name of the index.
+ :param timeout: Timeout for wait until index gets ready.
+ """
+ pinecone.delete_index(name=index_name, timeout=timeout)
+
+ @staticmethod
+ def configure_index(index_name: str, replicas: int | None = None,
pod_type: str | None = "") -> None:
+ """
+ Changes current configuration of the index.
+
+ :param index_name: The name of the index to configure.
+ :param replicas: The new number of replicas.
+ :param pod_type: the new pod_type for the index.
+ """
+ pinecone.configure_index(name=index_name, replicas=replicas,
pod_type=pod_type)
+
+ @staticmethod
+ def create_collection(collection_name: str, index_name: str) -> None:
+ """
+ Create a new collection from a specified index.
+
+ :param collection_name: The name of the collection to create.
+ :param index_name: The name of the source index.
+ """
+ pinecone.create_collection(name=collection_name, source=index_name)
+
+ @staticmethod
+ def delete_collection(collection_name: str) -> None:
+ """
+ Delete a specific collection.
+
+ :param collection_name: The name of the collection to delete.
+ """
+ pinecone.delete_collection(collection_name)
+
+ @staticmethod
+ def describe_collection(collection_name: str) -> Any:
+ """
+ Retrieve information about a specific collection.
+
+ :param collection_name: The name of the collection to describe.
+ """
+ return pinecone.describe_collection(collection_name)
+
+ @staticmethod
+ def list_collections() -> Any:
+ """Retrieve a list of all collections in the current project."""
+ return pinecone.list_collections()
+
+ @staticmethod
+ def query_vector(
+ index_name: str,
+ vector: list[Any],
+ query_id: str | None = None,
+ top_k: int = 10,
+ namespace: str | None = None,
+ query_filter: dict[str, str | float | int | bool | list[Any] |
dict[Any, Any]] | None = None,
+ include_values: bool | None = None,
+ include_metadata: bool | None = None,
+ sparse_vector: SparseValues | dict[str, list[float] | list[int]] |
None = None,
+ ) -> QueryResponse:
+ """
+ The Query operation searches a namespace, using a query vector.
+
+ It retrieves the ids of the most similar items in a namespace, along
with their similarity scores.
+ API reference: https://docs.pinecone.io/reference/query
+
+ :param index_name: The name of the index to query.
+ :param vector: The query vector.
+ :param query_id: The unique ID of the vector to be used as a query
vector.
+ :param top_k: The number of results to return.
+ :param namespace: The namespace to fetch vectors from. If not
specified, the default namespace is used.
+ :param query_filter: The filter to apply. See
https://www.pinecone.io/docs/metadata-filtering/
+ :param include_values: Whether to include the vector values in the
result.
+ :param include_metadata: Indicates whether metadata is included in the
response as well as the ids.
+ :param sparse_vector: sparse values of the query vector. Expected to
be either a SparseValues object or a dict
+ of the form: {'indices': List[int], 'values': List[float]}, where the
lists each have the same length.
+ """
+ index = pinecone.Index(index_name)
+ return index.query(
+ vector=vector,
+ id=query_id,
+ top_k=top_k,
+ namespace=namespace,
+ filter=query_filter,
+ include_values=include_values,
+ include_metadata=include_metadata,
+ sparse_vector=sparse_vector,
+ )
+
+ @staticmethod
+ def _chunks(iterable: list[Any], batch_size: int = 100) -> Any:
+ """Helper function to break an iterable into chunks of size
batch_size."""
+ it = iter(iterable)
+ chunk = tuple(itertools.islice(it, batch_size))
+ while chunk:
+ yield chunk
+ chunk = tuple(itertools.islice(it, batch_size))
+
+ def upsert_data_async(
+ self,
+ index_name: str,
+ data: list[tuple[Any]],
+ async_req: bool = False,
+ pool_threads: int | None = None,
+ ) -> None | list[Any]:
+ """
+ Upserts (insert/update) data into the Pinecone index.
+
+ :param index_name: Name of the index.
+ :param data: List of tuples to be upserted. Each tuple is of form (id,
vector, metadata).
+ Metadata is optional.
+ :param async_req: If True, upsert operations will be asynchronous.
+ :param pool_threads: Number of threads for parallel upserting. If
async_req is True, this must be provided.
+ :return: Upsert responses or None for async operations.
Review Comment:
```suggestion
```
maybe if want to stay consistent?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]