This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git


The following commit(s) were added to refs/heads/master by this push:
     new 94e619a  IGNITE-18006 Support timeouts in async cache operations (#58)
94e619a is described below

commit 94e619afe3b1b58839670df2993a0e61d0e8d863
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Mon Nov 7 13:58:22 2022 +0300

    IGNITE-18006 Support timeouts in async cache operations (#58)
---
 .github/workflows/pr_check.yml                  |  61 ++++++
 .travis.yml                                     |  54 -----
 pyignite/aio_cache.py                           |  97 ++++++---
 pyignite/aio_cluster.py                         |   4 +-
 pyignite/connection/aio_connection.py           |  15 +-
 pyignite/utils.py                               |  18 +-
 requirements/install.txt                        |   1 -
 requirements/tests.txt                          |   3 +-
 setup.py                                        |   1 +
 tests/affinity/test_affinity_request_routing.py |   7 +-
 tests/custom/test_timeouts.py                   | 273 ++++++++++++++++++++++++
 tests/util.py                                   |  17 +-
 tox.ini                                         |   4 +-
 13 files changed, 430 insertions(+), 125 deletions(-)

diff --git a/.github/workflows/pr_check.yml b/.github/workflows/pr_check.yml
new file mode 100644
index 0000000..2330e1d
--- /dev/null
+++ b/.github/workflows/pr_check.yml
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Check code style and run tests
+on:
+  pull_request:
+  push:
+    branches:
+      - master
+      - 'pyignite-*'
+
+env:
+  IGNITE_VERSION: 2.14.0
+  IGNITE_HOME: /opt/ignite
+
+jobs:
+  build:
+    runs-on: ubuntu-latest
+    strategy:
+      fail-fast: false
+      matrix:
+        cfg:
+          - { python: "3.7", toxenv: "py37" }
+          - { python: "3.8", toxenv: "py38" }
+          - { python: "3.9", toxenv: "py39" }
+          - { python: "3.10", toxenv: "py310" }
+          - { python: "3.11", toxenv: "py311" }
+          - { python: "3.11", toxenv: "codestyle" }
+
+    steps:
+      - uses: actions/checkout@v3
+      - name: Set up Python ${{ matrix.python-version }}
+        uses: actions/setup-python@v4
+        with:
+          python-version: ${{ matrix.cfg.python}}
+      - name: Install Apache Ignite
+        run: |
+          curl -L 
https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip
 > ignite.zip
+          unzip ignite.zip -d /opt
+          mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
+          mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
+
+      - name: Install tox
+        run: |
+          pip install tox
+
+      - name: Run tests
+        run: |
+          tox -e ${{ matrix.cfg.toxenv }}
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 45f26f6..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-language: python
-sudo: required
-
-addons:
-  apt:
-    packages:
-      - openjdk-8-jdk
-
-env:
-  global:
-    - IGNITE_VERSION=2.13.0
-    - IGNITE_HOME=/opt/ignite
-
-before_install:
-  - curl -L 
https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip
 > ignite.zip
-  - unzip ignite.zip -d /opt
-  - mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
-  - mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
-
-jobs:
-  include:
-    - python: '3.7'
-      arch: amd64
-      env: TOXENV=py37
-    - python: '3.8'
-      arch: amd64
-      env: TOXENV=py38
-    - python: '3.8'
-      arch: amd64
-      env: TOXENV=codestyle
-    - python: '3.9'
-      arch: amd64
-      env: TOXENV=py39
-    - python: '3.10'
-      arch: amd64
-      env: TOXENV=py310
-
-install: pip install tox
-script: tox
diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index 7a92a9a..e9d8b4c 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -97,18 +97,24 @@ class AioCache(BaseCache):
             return tx_conn
         return await self.client.get_best_node(self, key, key_hint)
 
-    async def settings(self) -> Optional[dict]:
+    async def settings(self, timeout: Union[int, float] = 0) -> Optional[dict]:
         """
         Lazy Cache settings. See the :ref:`example <sql_cache_read>`
         of reading this property.
 
         All cache properties are documented here: :ref:`cache_props`.
 
+        :param timeout: (optional) request timeout.
         :return: dict of cache properties and their values.
         """
         if self._settings is None:
             conn = await self._get_best_node()
-            config_result = await cache_get_configuration_async(conn, 
self.cache_info)
+
+            config_result_coro = cache_get_configuration_async(conn, 
self.cache_info)
+            if timeout:
+                config_result = await asyncio.wait_for(config_result_coro, 
timeout)
+            else:
+                config_result = await config_result_coro
 
             if config_result.status == 0:
                 self._settings = config_result.value
@@ -118,21 +124,24 @@ class AioCache(BaseCache):
         return self._settings
 
     @status_to_exception(CacheError)
-    async def destroy(self):
+    async def destroy(self, timeout: Union[int, float] = 0):
         """
         Destroys cache with a given name.
+
+        :param timeout: (optional) request timeout.
         """
         conn = await self._get_best_node()
         return await cache_destroy_async(conn, self.cache_id)
 
     @status_to_exception(CacheError)
-    async def get(self, key, key_hint: object = None) -> Any:
+    async def get(self, key, key_hint: object = None, timeout: Union[int, 
float] = 0) -> Any:
         """
         Retrieves a value from cache by key.
 
         :param key: key for the cache entry. Can be of any supported type,
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
+        :param timeout: (optional) request timeout.
         :return: value retrieved.
         """
         if key_hint is None:
@@ -144,7 +153,7 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def put(self, key, value, key_hint: object = None, value_hint: 
object = None):
+    async def put(self, key, value, key_hint: object = None, value_hint: 
object = None, timeout: Union[int, float] = 0):
         """
         Puts a value with a given key to cache (overwriting existing value
         if any).
@@ -154,7 +163,8 @@ class AioCache(BaseCache):
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
         :param value_hint: (optional) Ignite data type, for which the given
-         value should be converted.
+         value should be converted,
+        :param timeout: (optional) request timeout.
         """
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
@@ -163,11 +173,12 @@ class AioCache(BaseCache):
         return await cache_put_async(conn, self.cache_info, key, value, 
key_hint=key_hint, value_hint=value_hint)
 
     @status_to_exception(CacheError)
-    async def get_all(self, keys: list) -> dict:
+    async def get_all(self, keys: list, timeout: Union[int, float] = 0) -> 
dict:
         """
         Retrieves multiple key-value pairs from cache.
 
         :param keys: list of keys or tuples of (key, key_hint),
+        :param timeout: (optional) request timeout,
         :return: a dict of key-value pairs.
         """
         conn = await self._get_best_node()
@@ -181,7 +192,7 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def put_all(self, pairs: dict):
+    async def put_all(self, pairs: dict, timeout: Union[int, float] = 0):
         """
         Puts multiple key-value pairs to cache (overwriting existing
         associations if any).
@@ -189,12 +200,14 @@ class AioCache(BaseCache):
         :param pairs: dictionary type parameters, contains key-value pairs
          to save. Each key or value can be an item of representable
          Python type or a tuple of (item, hint),
+        :param timeout: (optional) request timeout.
         """
         conn = await self._get_best_node()
         return await cache_put_all_async(conn, self.cache_info, pairs)
 
     @status_to_exception(CacheError)
-    async def replace(self, key, value, key_hint: object = None, value_hint: 
object = None):
+    async def replace(self, key, value, key_hint: object = None, value_hint: 
object = None,
+                      timeout: Union[int, float] = 0):
         """
         Puts a value with a given key to cache only if the key already exist.
 
@@ -203,7 +216,8 @@ class AioCache(BaseCache):
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
         :param value_hint: (optional) Ignite data type, for which the given
-         value should be converted.
+         value should be converted,
+        :param timeout: (optional) request timeout.
         """
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
@@ -214,12 +228,13 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def clear(self, keys: Optional[list] = None):
+    async def clear(self, keys: Optional[list] = None, timeout: Union[int, 
float] = 0):
         """
         Clears the cache without notifying listeners or cache writers.
 
         :param keys: (optional) list of cache keys or (key, key type
          hint) tuples to clear (default: clear all).
+        :param timeout: (optional) request timeout.
         """
         conn = await self._get_best_node()
         if keys:
@@ -228,13 +243,14 @@ class AioCache(BaseCache):
             return await cache_clear_async(conn, self.cache_info)
 
     @status_to_exception(CacheError)
-    async def clear_key(self, key, key_hint: object = None):
+    async def clear_key(self, key, key_hint: object = None, timeout: 
Union[int, float] = 0):
         """
         Clears the cache key without notifying listeners or cache writers.
 
         :param key: key for the cache entry,
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
+        :param timeout: (optional) request timeout.
         """
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
@@ -243,23 +259,25 @@ class AioCache(BaseCache):
         return await cache_clear_key_async(conn, self.cache_info, key, 
key_hint=key_hint)
 
     @status_to_exception(CacheError)
-    async def clear_keys(self, keys: Iterable):
+    async def clear_keys(self, keys: Iterable, timeout: Union[int, float] = 0):
         """
         Clears the cache key without notifying listeners or cache writers.
 
         :param keys: a list of keys or (key, type hint) tuples
+        :param timeout: (optional) request timeout.
         """
         conn = await self._get_best_node()
         return await cache_clear_keys_async(conn, self.cache_info, keys)
 
     @status_to_exception(CacheError)
-    async def contains_key(self, key, key_hint=None) -> bool:
+    async def contains_key(self, key, key_hint=None, timeout: Union[int, 
float] = 0) -> bool:
         """
         Returns a value indicating whether given key is present in cache.
 
         :param key: key for the cache entry. Can be of any supported type,
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
+        :param timeout: (optional) request timeout,
         :return: boolean `True` when key is present, `False` otherwise.
         """
         if key_hint is None:
@@ -269,18 +287,19 @@ class AioCache(BaseCache):
         return await cache_contains_key_async(conn, self.cache_info, key, 
key_hint=key_hint)
 
     @status_to_exception(CacheError)
-    async def contains_keys(self, keys: Iterable) -> bool:
+    async def contains_keys(self, keys: Iterable, timeout: Union[int, float] = 
0) -> bool:
         """
         Returns a value indicating whether all given keys are present in cache.
 
         :param keys: a list of keys or (key, type hint) tuples,
+        :param timeout: (optional) request timeout,
         :return: boolean `True` when all keys are present, `False` otherwise.
         """
         conn = await self._get_best_node()
         return await cache_contains_keys_async(conn, self.cache_info, keys)
 
     @status_to_exception(CacheError)
-    async def get_and_put(self, key, value, key_hint=None, value_hint=None) -> 
Any:
+    async def get_and_put(self, key, value, key_hint=None, value_hint=None, 
timeout: Union[int, float] = 0) -> Any:
         """
         Puts a value with a given key to cache, and returns the previous value
         for that key, or null value if there was not such key.
@@ -290,7 +309,8 @@ class AioCache(BaseCache):
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
         :param value_hint: (optional) Ignite data type, for which the given
-         value should be converted.
+         value should be converted,
+        :param timeout: (optional) request timeout,
         :return: old value or None.
         """
         if key_hint is None:
@@ -303,7 +323,7 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def get_and_put_if_absent(self, key, value, key_hint=None, 
value_hint=None):
+    async def get_and_put_if_absent(self, key, value, key_hint=None, 
value_hint=None, timeout: Union[int, float] = 0):
         """
         Puts a value with a given key to cache only if the key does not
         already exist.
@@ -314,6 +334,7 @@ class AioCache(BaseCache):
          should be converted,
         :param value_hint: (optional) Ignite data type, for which the given
          value should be converted,
+        :param timeout: (optional) request timeout,
         :return: old value or None.
         """
         if key_hint is None:
@@ -325,7 +346,7 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def put_if_absent(self, key, value, key_hint=None, value_hint=None):
+    async def put_if_absent(self, key, value, key_hint=None, value_hint=None, 
timeout: Union[int, float] = 0):
         """
         Puts a value with a given key to cache only if the key does not
         already exist.
@@ -335,7 +356,8 @@ class AioCache(BaseCache):
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
         :param value_hint: (optional) Ignite data type, for which the given
-         value should be converted.
+         value should be converted,
+        :param timeout: (optional) request timeout,
         """
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
@@ -344,13 +366,14 @@ class AioCache(BaseCache):
         return await cache_put_if_absent_async(conn, self.cache_info, key, 
value, key_hint, value_hint)
 
     @status_to_exception(CacheError)
-    async def get_and_remove(self, key, key_hint=None) -> Any:
+    async def get_and_remove(self, key, key_hint=None, timeout: Union[int, 
float] = 0) -> Any:
         """
         Removes the cache entry with specified key, returning the value.
 
         :param key: key for the cache entry. Can be of any supported type,
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
+        :param timeout: (optional) request timeout,
         :return: old value or None.
         """
         if key_hint is None:
@@ -362,7 +385,7 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def get_and_replace(self, key, value, key_hint=None, 
value_hint=None) -> Any:
+    async def get_and_replace(self, key, value, key_hint=None, 
value_hint=None, timeout: Union[int, float] = 0) -> Any:
         """
         Puts a value with a given key to cache, returning previous value
         for that key, if and only if there is a value currently mapped
@@ -373,7 +396,8 @@ class AioCache(BaseCache):
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
         :param value_hint: (optional) Ignite data type, for which the given
-         value should be converted.
+         value should be converted,
+        :param timeout: (optional) request timeout,
         :return: old value or None.
         """
         if key_hint is None:
@@ -385,13 +409,14 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def remove_key(self, key, key_hint=None):
+    async def remove_key(self, key, key_hint=None, timeout: Union[int, float] 
= 0):
         """
         Clears the cache key without notifying listeners or cache writers.
 
         :param key: key for the cache entry,
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
+        :param timeout: (optional) request timeout.
         """
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
@@ -400,26 +425,29 @@ class AioCache(BaseCache):
         return await cache_remove_key_async(conn, self.cache_info, key, 
key_hint)
 
     @status_to_exception(CacheError)
-    async def remove_keys(self, keys: list):
+    async def remove_keys(self, keys: list, timeout: Union[int, float] = 0):
         """
         Removes cache entries by given list of keys, notifying listeners
         and cache writers.
 
-        :param keys: list of keys or tuples of (key, key_hint) to remove.
+        :param keys: list of keys or tuples of (key, key_hint) to remove,
+        :param timeout: (optional) request timeout.
         """
         conn = await self._get_best_node()
         return await cache_remove_keys_async(conn, self.cache_info, keys)
 
     @status_to_exception(CacheError)
-    async def remove_all(self):
+    async def remove_all(self, timeout: Union[int, float] = 0):
         """
         Removes all cache entries, notifying listeners and cache writers.
+
+        :param timeout: (optional) request timeout.
         """
         conn = await self._get_best_node()
         return await cache_remove_all_async(conn, self.cache_info)
 
     @status_to_exception(CacheError)
-    async def remove_if_equals(self, key, sample, key_hint=None, 
sample_hint=None):
+    async def remove_if_equals(self, key, sample, key_hint=None, 
sample_hint=None, timeout: Union[int, float] = 0):
         """
         Removes an entry with a given key if provided value is equal to
         actual value, notifying listeners and cache writers.
@@ -429,7 +457,8 @@ class AioCache(BaseCache):
         :param key_hint: (optional) Ignite data type, for which the given key
          should be converted,
         :param sample_hint: (optional) Ignite data type, for whic
-         the given sample should be converted.
+         the given sample should be converted,
+        :param timeout: (optional) request timeout.
         """
         if key_hint is None:
             key_hint = AnyDataObject.map_python_type(key)
@@ -438,7 +467,8 @@ class AioCache(BaseCache):
         return await cache_remove_if_equals_async(conn, self.cache_info, key, 
sample, key_hint, sample_hint)
 
     @status_to_exception(CacheError)
-    async def replace_if_equals(self, key, sample, value, key_hint=None, 
sample_hint=None, value_hint=None) -> Any:
+    async def replace_if_equals(self, key, sample, value, key_hint=None, 
sample_hint=None, value_hint=None,
+                                timeout: Union[int, float] = 0) -> Any:
         """
         Puts a value with a given key to cache only if the key already exists
         and value equals provided sample.
@@ -452,6 +482,7 @@ class AioCache(BaseCache):
          the given sample should be converted
         :param value_hint: (optional) Ignite data type, for which the given
          value should be converted,
+        :param timeout: (optional) request timeout,
         :return: boolean `True` when key is present, `False` otherwise.
         """
         if key_hint is None:
@@ -464,19 +495,21 @@ class AioCache(BaseCache):
         return result
 
     @status_to_exception(CacheError)
-    async def get_size(self, peek_modes=None):
+    async def get_size(self, peek_modes=None, timeout: Union[int, float] = 0):
         """
         Gets the number of entries in cache.
 
         :param peek_modes: (optional) limit count to near cache partition
          (PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
          (PeekModes.BACKUP). Defaults to primary cache partitions 
(PeekModes.PRIMARY),
+        :param timeout: (optional) request timeout,
         :return: integer number of cache entries.
         """
         conn = await self._get_best_node()
         return await cache_get_size_async(conn, self.cache_info, peek_modes)
 
-    def scan(self, page_size: int = 1, partitions: int = -1, local: bool = 
False) -> AioScanCursor:
+    def scan(self, page_size: int = 1, partitions: int = -1, local: bool = 
False,
+             timeout: Union[int, float] = 0) -> AioScanCursor:
         """
         Returns all key-value pairs from the cache, similar to `get_all`, but
         with internal pagination, which is slower, but safer.
diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py
index afbc41b..8f7aa27 100644
--- a/pyignite/aio_cluster.py
+++ b/pyignite/aio_cluster.py
@@ -36,7 +36,7 @@ class AioCluster:
         """
         self._client = client
 
-    @status_to_exception(ClusterError)
+    @status_to_exception(ClusterError, ignore_timeout=True)
     async def get_state(self) -> 'ClusterState':
         """
         Gets current cluster state.
@@ -48,7 +48,7 @@ class AioCluster:
         """
         return await cluster_get_state_async(await self._client.random_node())
 
-    @status_to_exception(ClusterError)
+    @status_to_exception(ClusterError, ignore_timeout=True)
     async def set_state(self, state: 'ClusterState'):
         """
         Changes current cluster state to the given.
diff --git a/pyignite/connection/aio_connection.py 
b/pyignite/connection/aio_connection.py
index 4d13d6e..13ab681 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -57,7 +57,8 @@ class BaseProtocol(asyncio.Protocol):
         try:
             self.__send_handshake(transport, self._conn)
         except Exception as e:
-            self._handshake_fut.set_exception(e)
+            if not self._handshake_fut.done():
+                self._handshake_fut.set_exception(e)
 
     def data_received(self, data: bytes) -> None:
         self._buffer += data
@@ -67,7 +68,7 @@ class BaseProtocol(asyncio.Protocol):
             if not self._handshake_fut.done():
                 hs_response = self.__parse_handshake(packet, self._conn.client)
                 self._handshake_fut.set_result(hs_response)
-            else:
+            elif not self._handshake_fut.cancelled() or not 
self._handshake_fut.exception():
                 self._conn.process_message(packet)
             self._buffer = self._buffer[packet_sz:len(self._buffer)]
 
@@ -203,7 +204,8 @@ class AioConnection(BaseConnection):
     def process_connection_lost(self, err, reconnect=False):
         self.failed = True
         for _, fut in self._pending_reqs.items():
-            fut.set_exception(err)
+            if not fut.done():
+                fut.set_exception(err)
         self._pending_reqs.clear()
 
         if self._transport_closed_fut and not 
self._transport_closed_fut.done():
@@ -215,8 +217,11 @@ class AioConnection(BaseConnection):
 
     def process_message(self, data):
         req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER, 
signed=True)
-        if req_id in self._pending_reqs:
-            self._pending_reqs[req_id].set_result(data)
+
+        req_fut = self._pending_reqs.get(req_id)
+        if req_fut:
+            if not req_fut.done():
+                req_fut.set_result(data)
             del self._pending_reqs[req_id]
 
     async def _connect_version(self) -> Union[dict, OrderedDict]:
diff --git a/pyignite/utils.py b/pyignite/utils.py
index 5fcbd38..ed0b5c6 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import asyncio
 import ctypes
 import decimal
 import inspect
@@ -225,12 +225,13 @@ def datetime_hashcode(value: int) -> int:
     return (value & LONG_MASK) ^ (unsigned(value, ctypes.c_ulonglong) >> 32)
 
 
-def status_to_exception(exc: Type[Exception]):
+def status_to_exception(exc: Type[Exception], ignore_timeout=False):
     """
-    Converts erroneous status code with error message to an exception
-    of the given class. Supports coroutines.
+    Converts erroneous status code with error message to an exception with 
type of the given class. Supports coroutines.
+    Also, support `timeout` argument for decorated async function.
 
     :param exc: the class of exception to raise,
+    :param ignore_timeout: If set, ignore `timeout` argument.
     :return: decorated function.
     """
     def process_result(result):
@@ -242,7 +243,14 @@ def status_to_exception(exc: Type[Exception]):
         if inspect.iscoroutinefunction(fn):
             @wraps(fn)
             async def ste_wrapper_async(*args, **kwargs):
-                return process_result(await fn(*args, **kwargs))
+                timeout = kwargs.pop('timeout', 0)
+                if timeout and not ignore_timeout:
+                    result = await asyncio.wait_for(fn(*args, **kwargs), 
timeout)
+                else:
+                    result = await fn(*args, **kwargs)
+
+                return process_result(result)
+
             return ste_wrapper_async
         else:
             @wraps(fn)
diff --git a/requirements/install.txt b/requirements/install.txt
index aa8290f..feb4eb6 100644
--- a/requirements/install.txt
+++ b/requirements/install.txt
@@ -1,4 +1,3 @@
 # these pip packages are necessary for the pyignite to run
 
 attrs>=20.3.0
-contextvars>=2.4;python_version<"3.7"
diff --git a/requirements/tests.txt b/requirements/tests.txt
index 7262fe9..5dc815a 100644
--- a/requirements/tests.txt
+++ b/requirements/tests.txt
@@ -1,7 +1,6 @@
 # these packages are used for testing
 
-async_generator==1.10; python_version < '3.7'
-pytest==6.2.2
+pytest==6.2.5
 pytest-cov==2.11.1
 pytest-asyncio==0.14.0
 teamcity-messages==1.28
diff --git a/setup.py b/setup.py
index 91a72f5..827066a 100644
--- a/setup.py
+++ b/setup.py
@@ -110,6 +110,7 @@ def run_setup(with_binary=True):
             'Programming Language :: Python :: 3.8',
             'Programming Language :: Python :: 3.9',
             'Programming Language :: Python :: 3.10',
+            'Programming Language :: Python :: 3.11',
             'Programming Language :: Python :: 3 :: Only',
             'Intended Audience :: Developers',
             'Topic :: Database :: Front-Ends',
diff --git a/tests/affinity/test_affinity_request_routing.py 
b/tests/affinity/test_affinity_request_routing.py
index b73eff3..fbb4b82 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -28,11 +28,6 @@ from pyignite.datatypes.prop_codes import PROP_NAME, 
PROP_BACKUPS_NUMBER, PROP_C
 from pyignite.monitoring import QueryEventListener
 from tests.util import wait_for_condition, wait_for_condition_async, 
start_ignite, kill_process_tree
 
-try:
-    from contextlib import asynccontextmanager
-except ImportError:
-    from async_generator import asynccontextmanager
-
 requests = deque()
 
 
@@ -401,7 +396,7 @@ def create_caches(client):
                 pass
 
 
-@asynccontextmanager
[email protected]
 async def create_caches_async(client):
     caches = []
     try:
diff --git a/tests/custom/test_timeouts.py b/tests/custom/test_timeouts.py
new file mode 100644
index 0000000..ef01af1
--- /dev/null
+++ b/tests/custom/test_timeouts.py
@@ -0,0 +1,273 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import sys
+from asyncio import TimeoutError, InvalidStateError
+
+import pytest
+
+from pyignite import AioClient
+from pyignite.aio_cache import AioCache
+from pyignite.datatypes.key_value import PeekModes
+from tests.util import start_ignite_gen
+
+
[email protected](scope='module', autouse=True)
+def server1():
+    yield from start_ignite_gen(idx=1)
+
+
[email protected](autouse=True)
+async def proxy(event_loop, server1, cache):
+    proxy = ProxyServer(("127.0.0.1", 10802), ("127.0.0.1", 10801))
+    try:
+        await proxy.start()
+        yield proxy
+    finally:
+        await proxy.close()
+
+
[email protected](scope='module', autouse=True)
+async def cache(server1):
+    c = AioClient(partition_aware=False)
+    async with c.connect("127.0.0.1", 10801):
+        try:
+            cache = await c.get_or_create_cache("test")
+            yield cache
+        finally:
+            await cache.destroy()
+
+
[email protected](autouse=True)
+def invalid_states_errors():
+    errors = []
+
+    def trace(_, event, arg):
+        if event == 'exception':
+            etype, _, _ = arg
+            if etype is InvalidStateError:
+                errors.append(arg)
+
+        return trace
+
+    try:
+        sys.settrace(trace)
+        yield errors
+    finally:
+        sys.settrace(None)
+
+
+cache_method_params = (
+    "method,targs",
+    [
+        (AioCache.get, (1,)),
+        (AioCache.put, (1, 1)),
+        (AioCache.get_all, ([1, 1],)),
+        (AioCache.put_all, ({1: 1},)),
+        (AioCache.replace, (1, 1)),
+        (AioCache.clear, ()),
+        (AioCache.clear_key, (1,)),
+        (AioCache.clear_keys, ([1, 1],)),
+        (AioCache.contains_key, (1,)),
+        (AioCache.contains_keys, ([1, 1],)),
+        (AioCache.get_and_put, (1, 1)),
+        (AioCache.get_and_put_if_absent, (1, 1)),
+        (AioCache.put_if_absent, (1, 1)),
+        (AioCache.get_and_remove, (1,)),
+        (AioCache.get_and_replace, (1, 1)),
+        (AioCache.remove_key, (1,)),
+        (AioCache.remove_keys, ([1, 1],)),
+        (AioCache.remove_all, ()),
+        (AioCache.remove_if_equals, (1, 1)),
+        (AioCache.replace_if_equals, (1, 1, 1)),
+        (AioCache.get_size, ([PeekModes.PRIMARY, PeekModes.BACKUP],)),
+        (AioCache.get_size, ()),
+        (AioCache.settings, ())
+    ]
+)
+
+
[email protected](*cache_method_params)
[email protected]
+async def test_cancellation_on_slow_response(event_loop, proxy, 
invalid_states_errors,
+                                             method, targs):
+    c = AioClient(partition_aware=False)
+    async with c.connect("127.0.0.1", 10802):
+        cache = await c.get_cache("test")
+
+        proxy.slow_response = True
+        with pytest.raises(TimeoutError):
+            await method(cache, *targs, timeout=0.1)
+
+        proxy.slow_response = False
+        assert len(invalid_states_errors) == 0
+
+
[email protected](*cache_method_params)
[email protected]
+async def test_cancellation_on_disconnect(event_loop, proxy, 
invalid_states_errors,
+                                          method, targs):
+    c = AioClient(partition_aware=False)
+    async with c.connect("127.0.0.1", 10802):
+        cache = await c.get_cache("test")
+        proxy.discard_response = True
+
+        result = asyncio.ensure_future(method(cache, *targs, timeout=0.1))
+        await asyncio.sleep(0.2)
+        await proxy.disconnect_peers()
+
+        with pytest.raises(TimeoutError):
+            await result
+
+    assert len(invalid_states_errors) == 0
+
+
+class ProxyServer:
+    """
+    Proxy for simulating discarding response or slow response from ignite 
server
+    Set `discard_response` or `slow_response` to `True` to simulate this 
condition.
+    Set `slow_response_timeout` to change sleep time interval for slow 
responses.
+    Call `disconnect_peers()` in order to simulate lost connection to Ignite 
server.
+    """
+    def __init__(self, local_host, remote_host):
+        self.local_host = local_host
+        self.remote_host = remote_host
+        self.peers = {}
+        self.discard_response, self.slow_response = False, False
+        self.slow_response_timeout = 0.1
+        self.server = None
+
+    async def start(self):
+        loop = asyncio.get_event_loop()
+        host, port = self.local_host
+        self.server = await loop.create_server(
+            lambda: ProxyTcpProtocol(self), host=host, port=port)
+
+    async def disconnect_peers(self):
+        peers = dict(self.peers)
+        for k, v in peers.items():
+            if not v:
+                return
+
+            local, remote = v
+            if local:
+                await remote.close()
+            if remote:
+                await local.close()
+
+    async def close(self):
+        try:
+            await self.disconnect_peers()
+        except TimeoutError:
+            pass
+
+        self.server.close()
+
+
+class ProxyTcpProtocol(asyncio.Protocol):
+    def __init__(self, proxy):
+        self.addr, self.port = proxy.remote_host
+        self.proxy = proxy
+        self.transport, self.remote_protocol = None, None
+        self.conn_info, self.close_fut = None, None
+        super().__init__()
+
+    def connection_made(self, transport):
+        self.transport = transport
+        self.conn_info = transport.get_extra_info("peername")
+
+    def data_received(self, data):
+        if self.remote_protocol and self.remote_protocol.transport:
+            self.remote_protocol.transport.write(data)
+            return
+
+        loop = asyncio.get_event_loop()
+        self.remote_protocol = RemoteTcpProtocol(self.proxy, self, data)
+        coro = loop.create_connection(lambda: self.remote_protocol, 
host=self.addr, port=self.port)
+        asyncio.ensure_future(coro)
+
+        self.proxy.peers[self.conn_info] = (self, self.remote_protocol)
+
+    async def close(self):
+        if not self.transport:
+            return
+
+        self.close_fut = asyncio.get_event_loop().create_future()
+        self.transport.close()
+
+        try:
+            await asyncio.wait_for(self.close_fut, 0.1)
+        except TimeoutError:
+            pass
+
+    def connection_lost(self, exc):
+        if self.close_fut:
+            self.close_fut.done()
+
+
+class RemoteTcpProtocol(asyncio.Protocol):
+    def __init__(self, proxy, proxy_protocol, data):
+        self.proxy = proxy
+        self.proxy_protocol = proxy_protocol
+        self.data = data
+        self.transport, self.close_fut = None, None
+        self.queue = []
+        self.loop_task = asyncio.ensure_future(self.send_loop())
+        self.stop_flag = False
+        super().__init__()
+
+    def connection_made(self, transport):
+        self.transport = transport
+        self.transport.write(self.data)
+
+    async def close(self):
+        if not self.transport:
+            return
+
+        self.stop_flag = True
+        self.close_fut = asyncio.get_event_loop().create_future()
+        self.transport.close()
+        try:
+            await asyncio.wait_for(self.loop_task, 0.1)
+        except TimeoutError:
+            pass
+
+        try:
+            await asyncio.wait_for(self.close_fut, 0.1)
+        except TimeoutError:
+            pass
+
+    def connection_lost(self, exc):
+        if self.close_fut:
+            self.close_fut.done()
+
+    async def send_loop(self):
+        while not self.stop_flag:
+            if not self.queue:
+                await asyncio.sleep(0.01)
+                continue
+
+            packet = self.queue.pop()
+            if packet:
+                if self.proxy.slow_response:
+                    await asyncio.sleep(self.proxy.slow_response_timeout)
+                self.proxy_protocol.transport.write(packet)
+
+    def data_received(self, data):
+        if self.proxy.discard_response:
+            return
+
+        self.queue.append(data)
diff --git a/tests/util.py b/tests/util.py
index af3b70e..6101a09 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -27,12 +27,6 @@ import subprocess
 import time
 
 
-try:
-    from contextlib import asynccontextmanager
-except ImportError:
-    from async_generator import asynccontextmanager
-
-
 @contextlib.contextmanager
 def get_or_create_cache(client, settings):
     cache = client.get_or_create_cache(settings)
@@ -42,7 +36,7 @@ def get_or_create_cache(client, settings):
         cache.destroy()
 
 
-@asynccontextmanager
[email protected]
 async def get_or_create_cache_async(client, settings):
     cache = await client.get_or_create_cache(settings)
     try:
@@ -114,15 +108,6 @@ def get_ignite_runner():
     raise Exception(f"Ignite not found. IGNITE_HOME 
{os.getenv('IGNITE_HOME')}")
 
 
-def get_ignite_config_path(use_ssl=False):
-    if use_ssl:
-        file_name = "ignite-config-ssl.xml"
-    else:
-        file_name = "ignite-config.xml.jinja2"
-
-    return os.path.join(get_test_dir(), "config", file_name)
-
-
 def check_server_started(idx=1):
     pattern = re.compile('^Topology snapshot.*')
 
diff --git a/tox.ini b/tox.ini
index e873e21..d68f02e 100644
--- a/tox.ini
+++ b/tox.ini
@@ -15,7 +15,7 @@
 
 [tox]
 skipsdist = True
-envlist = codestyle,py{37,38,39,310}
+envlist = codestyle,py{37,38,39,310,311}
 
 [pytest]
 log_format = %(asctime)s %(name)s %(levelname)s %(message)s
@@ -43,6 +43,6 @@ usedevelop = True
 commands =
     pytest {env:PYTESTARGS:} {posargs} --force-cext --examples
 
-[testenv:py{36,37,38,39}-jenkins]
+[testenv:py{37,38,39,310,311}-jenkins]
 setenv:
     PYTESTARGS = --junitxml=junit-{envname}.xml


Reply via email to