This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-python-thin-client.git
The following commit(s) were added to refs/heads/master by this push:
new 94e619a IGNITE-18006 Support timeouts in async cache operations (#58)
94e619a is described below
commit 94e619afe3b1b58839670df2993a0e61d0e8d863
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Mon Nov 7 13:58:22 2022 +0300
IGNITE-18006 Support timeouts in async cache operations (#58)
---
.github/workflows/pr_check.yml | 61 ++++++
.travis.yml | 54 -----
pyignite/aio_cache.py | 97 ++++++---
pyignite/aio_cluster.py | 4 +-
pyignite/connection/aio_connection.py | 15 +-
pyignite/utils.py | 18 +-
requirements/install.txt | 1 -
requirements/tests.txt | 3 +-
setup.py | 1 +
tests/affinity/test_affinity_request_routing.py | 7 +-
tests/custom/test_timeouts.py | 273 ++++++++++++++++++++++++
tests/util.py | 17 +-
tox.ini | 4 +-
13 files changed, 430 insertions(+), 125 deletions(-)
diff --git a/.github/workflows/pr_check.yml b/.github/workflows/pr_check.yml
new file mode 100644
index 0000000..2330e1d
--- /dev/null
+++ b/.github/workflows/pr_check.yml
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Check code style and run tests
+on:
+ pull_request:
+ push:
+ branches:
+ - master
+ - 'pyignite-*'
+
+env:
+ IGNITE_VERSION: 2.14.0
+ IGNITE_HOME: /opt/ignite
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ cfg:
+ - { python: "3.7", toxenv: "py37" }
+ - { python: "3.8", toxenv: "py38" }
+ - { python: "3.9", toxenv: "py39" }
+ - { python: "3.10", toxenv: "py310" }
+ - { python: "3.11", toxenv: "py311" }
+ - { python: "3.11", toxenv: "codestyle" }
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.cfg.python}}
+ - name: Install Apache Ignite
+ run: |
+ curl -L
https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip
> ignite.zip
+ unzip ignite.zip -d /opt
+ mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
+ mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
+
+ - name: Install tox
+ run: |
+ pip install tox
+
+ - name: Run tests
+ run: |
+ tox -e ${{ matrix.cfg.toxenv }}
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 45f26f6..0000000
--- a/.travis.yml
+++ /dev/null
@@ -1,54 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-language: python
-sudo: required
-
-addons:
- apt:
- packages:
- - openjdk-8-jdk
-
-env:
- global:
- - IGNITE_VERSION=2.13.0
- - IGNITE_HOME=/opt/ignite
-
-before_install:
- - curl -L
https://apache-mirror.rbc.ru/pub/apache/ignite/${IGNITE_VERSION}/apache-ignite-slim-${IGNITE_VERSION}-bin.zip
> ignite.zip
- - unzip ignite.zip -d /opt
- - mv /opt/apache-ignite-slim-${IGNITE_VERSION}-bin /opt/ignite
- - mv /opt/ignite/libs/optional/ignite-log4j2 /opt/ignite/libs/
-
-jobs:
- include:
- - python: '3.7'
- arch: amd64
- env: TOXENV=py37
- - python: '3.8'
- arch: amd64
- env: TOXENV=py38
- - python: '3.8'
- arch: amd64
- env: TOXENV=codestyle
- - python: '3.9'
- arch: amd64
- env: TOXENV=py39
- - python: '3.10'
- arch: amd64
- env: TOXENV=py310
-
-install: pip install tox
-script: tox
diff --git a/pyignite/aio_cache.py b/pyignite/aio_cache.py
index 7a92a9a..e9d8b4c 100644
--- a/pyignite/aio_cache.py
+++ b/pyignite/aio_cache.py
@@ -97,18 +97,24 @@ class AioCache(BaseCache):
return tx_conn
return await self.client.get_best_node(self, key, key_hint)
- async def settings(self) -> Optional[dict]:
+ async def settings(self, timeout: Union[int, float] = 0) -> Optional[dict]:
"""
Lazy Cache settings. See the :ref:`example <sql_cache_read>`
of reading this property.
All cache properties are documented here: :ref:`cache_props`.
+ :param timeout: (optional) request timeout.
:return: dict of cache properties and their values.
"""
if self._settings is None:
conn = await self._get_best_node()
- config_result = await cache_get_configuration_async(conn,
self.cache_info)
+
+ config_result_coro = cache_get_configuration_async(conn,
self.cache_info)
+ if timeout:
+ config_result = await asyncio.wait_for(config_result_coro,
timeout)
+ else:
+ config_result = await config_result_coro
if config_result.status == 0:
self._settings = config_result.value
@@ -118,21 +124,24 @@ class AioCache(BaseCache):
return self._settings
@status_to_exception(CacheError)
- async def destroy(self):
+ async def destroy(self, timeout: Union[int, float] = 0):
"""
Destroys cache with a given name.
+
+ :param timeout: (optional) request timeout.
"""
conn = await self._get_best_node()
return await cache_destroy_async(conn, self.cache_id)
@status_to_exception(CacheError)
- async def get(self, key, key_hint: object = None) -> Any:
+ async def get(self, key, key_hint: object = None, timeout: Union[int,
float] = 0) -> Any:
"""
Retrieves a value from cache by key.
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
+ :param timeout: (optional) request timeout.
:return: value retrieved.
"""
if key_hint is None:
@@ -144,7 +153,7 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def put(self, key, value, key_hint: object = None, value_hint:
object = None):
+ async def put(self, key, value, key_hint: object = None, value_hint:
object = None, timeout: Union[int, float] = 0):
"""
Puts a value with a given key to cache (overwriting existing value
if any).
@@ -154,7 +163,8 @@ class AioCache(BaseCache):
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
- value should be converted.
+ value should be converted,
+ :param timeout: (optional) request timeout.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
@@ -163,11 +173,12 @@ class AioCache(BaseCache):
return await cache_put_async(conn, self.cache_info, key, value,
key_hint=key_hint, value_hint=value_hint)
@status_to_exception(CacheError)
- async def get_all(self, keys: list) -> dict:
+ async def get_all(self, keys: list, timeout: Union[int, float] = 0) ->
dict:
"""
Retrieves multiple key-value pairs from cache.
:param keys: list of keys or tuples of (key, key_hint),
+ :param timeout: (optional) request timeout,
:return: a dict of key-value pairs.
"""
conn = await self._get_best_node()
@@ -181,7 +192,7 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def put_all(self, pairs: dict):
+ async def put_all(self, pairs: dict, timeout: Union[int, float] = 0):
"""
Puts multiple key-value pairs to cache (overwriting existing
associations if any).
@@ -189,12 +200,14 @@ class AioCache(BaseCache):
:param pairs: dictionary type parameters, contains key-value pairs
to save. Each key or value can be an item of representable
Python type or a tuple of (item, hint),
+ :param timeout: (optional) request timeout.
"""
conn = await self._get_best_node()
return await cache_put_all_async(conn, self.cache_info, pairs)
@status_to_exception(CacheError)
- async def replace(self, key, value, key_hint: object = None, value_hint:
object = None):
+ async def replace(self, key, value, key_hint: object = None, value_hint:
object = None,
+ timeout: Union[int, float] = 0):
"""
Puts a value with a given key to cache only if the key already exist.
@@ -203,7 +216,8 @@ class AioCache(BaseCache):
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
- value should be converted.
+ value should be converted,
+ :param timeout: (optional) request timeout.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
@@ -214,12 +228,13 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def clear(self, keys: Optional[list] = None):
+ async def clear(self, keys: Optional[list] = None, timeout: Union[int,
float] = 0):
"""
Clears the cache without notifying listeners or cache writers.
:param keys: (optional) list of cache keys or (key, key type
hint) tuples to clear (default: clear all).
+ :param timeout: (optional) request timeout.
"""
conn = await self._get_best_node()
if keys:
@@ -228,13 +243,14 @@ class AioCache(BaseCache):
return await cache_clear_async(conn, self.cache_info)
@status_to_exception(CacheError)
- async def clear_key(self, key, key_hint: object = None):
+ async def clear_key(self, key, key_hint: object = None, timeout:
Union[int, float] = 0):
"""
Clears the cache key without notifying listeners or cache writers.
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
+ :param timeout: (optional) request timeout.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
@@ -243,23 +259,25 @@ class AioCache(BaseCache):
return await cache_clear_key_async(conn, self.cache_info, key,
key_hint=key_hint)
@status_to_exception(CacheError)
- async def clear_keys(self, keys: Iterable):
+ async def clear_keys(self, keys: Iterable, timeout: Union[int, float] = 0):
"""
Clears the cache key without notifying listeners or cache writers.
:param keys: a list of keys or (key, type hint) tuples
+ :param timeout: (optional) request timeout.
"""
conn = await self._get_best_node()
return await cache_clear_keys_async(conn, self.cache_info, keys)
@status_to_exception(CacheError)
- async def contains_key(self, key, key_hint=None) -> bool:
+ async def contains_key(self, key, key_hint=None, timeout: Union[int,
float] = 0) -> bool:
"""
Returns a value indicating whether given key is present in cache.
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
+ :param timeout: (optional) request timeout,
:return: boolean `True` when key is present, `False` otherwise.
"""
if key_hint is None:
@@ -269,18 +287,19 @@ class AioCache(BaseCache):
return await cache_contains_key_async(conn, self.cache_info, key,
key_hint=key_hint)
@status_to_exception(CacheError)
- async def contains_keys(self, keys: Iterable) -> bool:
+ async def contains_keys(self, keys: Iterable, timeout: Union[int, float] =
0) -> bool:
"""
Returns a value indicating whether all given keys are present in cache.
:param keys: a list of keys or (key, type hint) tuples,
+ :param timeout: (optional) request timeout,
:return: boolean `True` when all keys are present, `False` otherwise.
"""
conn = await self._get_best_node()
return await cache_contains_keys_async(conn, self.cache_info, keys)
@status_to_exception(CacheError)
- async def get_and_put(self, key, value, key_hint=None, value_hint=None) ->
Any:
+ async def get_and_put(self, key, value, key_hint=None, value_hint=None,
timeout: Union[int, float] = 0) -> Any:
"""
Puts a value with a given key to cache, and returns the previous value
for that key, or null value if there was not such key.
@@ -290,7 +309,8 @@ class AioCache(BaseCache):
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
- value should be converted.
+ value should be converted,
+ :param timeout: (optional) request timeout,
:return: old value or None.
"""
if key_hint is None:
@@ -303,7 +323,7 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def get_and_put_if_absent(self, key, value, key_hint=None,
value_hint=None):
+ async def get_and_put_if_absent(self, key, value, key_hint=None,
value_hint=None, timeout: Union[int, float] = 0):
"""
Puts a value with a given key to cache only if the key does not
already exist.
@@ -314,6 +334,7 @@ class AioCache(BaseCache):
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
value should be converted,
+ :param timeout: (optional) request timeout,
:return: old value or None.
"""
if key_hint is None:
@@ -325,7 +346,7 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def put_if_absent(self, key, value, key_hint=None, value_hint=None):
+ async def put_if_absent(self, key, value, key_hint=None, value_hint=None,
timeout: Union[int, float] = 0):
"""
Puts a value with a given key to cache only if the key does not
already exist.
@@ -335,7 +356,8 @@ class AioCache(BaseCache):
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
- value should be converted.
+ value should be converted,
+ :param timeout: (optional) request timeout,
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
@@ -344,13 +366,14 @@ class AioCache(BaseCache):
return await cache_put_if_absent_async(conn, self.cache_info, key,
value, key_hint, value_hint)
@status_to_exception(CacheError)
- async def get_and_remove(self, key, key_hint=None) -> Any:
+ async def get_and_remove(self, key, key_hint=None, timeout: Union[int,
float] = 0) -> Any:
"""
Removes the cache entry with specified key, returning the value.
:param key: key for the cache entry. Can be of any supported type,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
+ :param timeout: (optional) request timeout,
:return: old value or None.
"""
if key_hint is None:
@@ -362,7 +385,7 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def get_and_replace(self, key, value, key_hint=None,
value_hint=None) -> Any:
+ async def get_and_replace(self, key, value, key_hint=None,
value_hint=None, timeout: Union[int, float] = 0) -> Any:
"""
Puts a value with a given key to cache, returning previous value
for that key, if and only if there is a value currently mapped
@@ -373,7 +396,8 @@ class AioCache(BaseCache):
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param value_hint: (optional) Ignite data type, for which the given
- value should be converted.
+ value should be converted,
+ :param timeout: (optional) request timeout,
:return: old value or None.
"""
if key_hint is None:
@@ -385,13 +409,14 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def remove_key(self, key, key_hint=None):
+ async def remove_key(self, key, key_hint=None, timeout: Union[int, float]
= 0):
"""
Clears the cache key without notifying listeners or cache writers.
:param key: key for the cache entry,
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
+ :param timeout: (optional) request timeout.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
@@ -400,26 +425,29 @@ class AioCache(BaseCache):
return await cache_remove_key_async(conn, self.cache_info, key,
key_hint)
@status_to_exception(CacheError)
- async def remove_keys(self, keys: list):
+ async def remove_keys(self, keys: list, timeout: Union[int, float] = 0):
"""
Removes cache entries by given list of keys, notifying listeners
and cache writers.
- :param keys: list of keys or tuples of (key, key_hint) to remove.
+ :param keys: list of keys or tuples of (key, key_hint) to remove,
+ :param timeout: (optional) request timeout.
"""
conn = await self._get_best_node()
return await cache_remove_keys_async(conn, self.cache_info, keys)
@status_to_exception(CacheError)
- async def remove_all(self):
+ async def remove_all(self, timeout: Union[int, float] = 0):
"""
Removes all cache entries, notifying listeners and cache writers.
+
+ :param timeout: (optional) request timeout.
"""
conn = await self._get_best_node()
return await cache_remove_all_async(conn, self.cache_info)
@status_to_exception(CacheError)
- async def remove_if_equals(self, key, sample, key_hint=None,
sample_hint=None):
+ async def remove_if_equals(self, key, sample, key_hint=None,
sample_hint=None, timeout: Union[int, float] = 0):
"""
Removes an entry with a given key if provided value is equal to
actual value, notifying listeners and cache writers.
@@ -429,7 +457,8 @@ class AioCache(BaseCache):
:param key_hint: (optional) Ignite data type, for which the given key
should be converted,
:param sample_hint: (optional) Ignite data type, for whic
- the given sample should be converted.
+ the given sample should be converted,
+ :param timeout: (optional) request timeout.
"""
if key_hint is None:
key_hint = AnyDataObject.map_python_type(key)
@@ -438,7 +467,8 @@ class AioCache(BaseCache):
return await cache_remove_if_equals_async(conn, self.cache_info, key,
sample, key_hint, sample_hint)
@status_to_exception(CacheError)
- async def replace_if_equals(self, key, sample, value, key_hint=None,
sample_hint=None, value_hint=None) -> Any:
+ async def replace_if_equals(self, key, sample, value, key_hint=None,
sample_hint=None, value_hint=None,
+ timeout: Union[int, float] = 0) -> Any:
"""
Puts a value with a given key to cache only if the key already exists
and value equals provided sample.
@@ -452,6 +482,7 @@ class AioCache(BaseCache):
the given sample should be converted
:param value_hint: (optional) Ignite data type, for which the given
value should be converted,
+ :param timeout: (optional) request timeout,
:return: boolean `True` when key is present, `False` otherwise.
"""
if key_hint is None:
@@ -464,19 +495,21 @@ class AioCache(BaseCache):
return result
@status_to_exception(CacheError)
- async def get_size(self, peek_modes=None):
+ async def get_size(self, peek_modes=None, timeout: Union[int, float] = 0):
"""
Gets the number of entries in cache.
:param peek_modes: (optional) limit count to near cache partition
(PeekModes.NEAR), primary cache (PeekModes.PRIMARY), or backup cache
(PeekModes.BACKUP). Defaults to primary cache partitions
(PeekModes.PRIMARY),
+ :param timeout: (optional) request timeout,
:return: integer number of cache entries.
"""
conn = await self._get_best_node()
return await cache_get_size_async(conn, self.cache_info, peek_modes)
- def scan(self, page_size: int = 1, partitions: int = -1, local: bool =
False) -> AioScanCursor:
+ def scan(self, page_size: int = 1, partitions: int = -1, local: bool =
False,
+ timeout: Union[int, float] = 0) -> AioScanCursor:
"""
Returns all key-value pairs from the cache, similar to `get_all`, but
with internal pagination, which is slower, but safer.
diff --git a/pyignite/aio_cluster.py b/pyignite/aio_cluster.py
index afbc41b..8f7aa27 100644
--- a/pyignite/aio_cluster.py
+++ b/pyignite/aio_cluster.py
@@ -36,7 +36,7 @@ class AioCluster:
"""
self._client = client
- @status_to_exception(ClusterError)
+ @status_to_exception(ClusterError, ignore_timeout=True)
async def get_state(self) -> 'ClusterState':
"""
Gets current cluster state.
@@ -48,7 +48,7 @@ class AioCluster:
"""
return await cluster_get_state_async(await self._client.random_node())
- @status_to_exception(ClusterError)
+ @status_to_exception(ClusterError, ignore_timeout=True)
async def set_state(self, state: 'ClusterState'):
"""
Changes current cluster state to the given.
diff --git a/pyignite/connection/aio_connection.py
b/pyignite/connection/aio_connection.py
index 4d13d6e..13ab681 100644
--- a/pyignite/connection/aio_connection.py
+++ b/pyignite/connection/aio_connection.py
@@ -57,7 +57,8 @@ class BaseProtocol(asyncio.Protocol):
try:
self.__send_handshake(transport, self._conn)
except Exception as e:
- self._handshake_fut.set_exception(e)
+ if not self._handshake_fut.done():
+ self._handshake_fut.set_exception(e)
def data_received(self, data: bytes) -> None:
self._buffer += data
@@ -67,7 +68,7 @@ class BaseProtocol(asyncio.Protocol):
if not self._handshake_fut.done():
hs_response = self.__parse_handshake(packet, self._conn.client)
self._handshake_fut.set_result(hs_response)
- else:
+ elif not self._handshake_fut.cancelled() or not
self._handshake_fut.exception():
self._conn.process_message(packet)
self._buffer = self._buffer[packet_sz:len(self._buffer)]
@@ -203,7 +204,8 @@ class AioConnection(BaseConnection):
def process_connection_lost(self, err, reconnect=False):
self.failed = True
for _, fut in self._pending_reqs.items():
- fut.set_exception(err)
+ if not fut.done():
+ fut.set_exception(err)
self._pending_reqs.clear()
if self._transport_closed_fut and not
self._transport_closed_fut.done():
@@ -215,8 +217,11 @@ class AioConnection(BaseConnection):
def process_message(self, data):
req_id = int.from_bytes(data[4:12], byteorder=PROTOCOL_BYTE_ORDER,
signed=True)
- if req_id in self._pending_reqs:
- self._pending_reqs[req_id].set_result(data)
+
+ req_fut = self._pending_reqs.get(req_id)
+ if req_fut:
+ if not req_fut.done():
+ req_fut.set_result(data)
del self._pending_reqs[req_id]
async def _connect_version(self) -> Union[dict, OrderedDict]:
diff --git a/pyignite/utils.py b/pyignite/utils.py
index 5fcbd38..ed0b5c6 100644
--- a/pyignite/utils.py
+++ b/pyignite/utils.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import asyncio
import ctypes
import decimal
import inspect
@@ -225,12 +225,13 @@ def datetime_hashcode(value: int) -> int:
return (value & LONG_MASK) ^ (unsigned(value, ctypes.c_ulonglong) >> 32)
-def status_to_exception(exc: Type[Exception]):
+def status_to_exception(exc: Type[Exception], ignore_timeout=False):
"""
- Converts erroneous status code with error message to an exception
- of the given class. Supports coroutines.
+ Converts erroneous status code with error message to an exception with
type of the given class. Supports coroutines.
+ Also, support `timeout` argument for decorated async function.
:param exc: the class of exception to raise,
+ :param ignore_timeout: If set, ignore `timeout` argument.
:return: decorated function.
"""
def process_result(result):
@@ -242,7 +243,14 @@ def status_to_exception(exc: Type[Exception]):
if inspect.iscoroutinefunction(fn):
@wraps(fn)
async def ste_wrapper_async(*args, **kwargs):
- return process_result(await fn(*args, **kwargs))
+ timeout = kwargs.pop('timeout', 0)
+ if timeout and not ignore_timeout:
+ result = await asyncio.wait_for(fn(*args, **kwargs),
timeout)
+ else:
+ result = await fn(*args, **kwargs)
+
+ return process_result(result)
+
return ste_wrapper_async
else:
@wraps(fn)
diff --git a/requirements/install.txt b/requirements/install.txt
index aa8290f..feb4eb6 100644
--- a/requirements/install.txt
+++ b/requirements/install.txt
@@ -1,4 +1,3 @@
# these pip packages are necessary for the pyignite to run
attrs>=20.3.0
-contextvars>=2.4;python_version<"3.7"
diff --git a/requirements/tests.txt b/requirements/tests.txt
index 7262fe9..5dc815a 100644
--- a/requirements/tests.txt
+++ b/requirements/tests.txt
@@ -1,7 +1,6 @@
# these packages are used for testing
-async_generator==1.10; python_version < '3.7'
-pytest==6.2.2
+pytest==6.2.5
pytest-cov==2.11.1
pytest-asyncio==0.14.0
teamcity-messages==1.28
diff --git a/setup.py b/setup.py
index 91a72f5..827066a 100644
--- a/setup.py
+++ b/setup.py
@@ -110,6 +110,7 @@ def run_setup(with_binary=True):
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
+ 'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3 :: Only',
'Intended Audience :: Developers',
'Topic :: Database :: Front-Ends',
diff --git a/tests/affinity/test_affinity_request_routing.py
b/tests/affinity/test_affinity_request_routing.py
index b73eff3..fbb4b82 100644
--- a/tests/affinity/test_affinity_request_routing.py
+++ b/tests/affinity/test_affinity_request_routing.py
@@ -28,11 +28,6 @@ from pyignite.datatypes.prop_codes import PROP_NAME,
PROP_BACKUPS_NUMBER, PROP_C
from pyignite.monitoring import QueryEventListener
from tests.util import wait_for_condition, wait_for_condition_async,
start_ignite, kill_process_tree
-try:
- from contextlib import asynccontextmanager
-except ImportError:
- from async_generator import asynccontextmanager
-
requests = deque()
@@ -401,7 +396,7 @@ def create_caches(client):
pass
-@asynccontextmanager
[email protected]
async def create_caches_async(client):
caches = []
try:
diff --git a/tests/custom/test_timeouts.py b/tests/custom/test_timeouts.py
new file mode 100644
index 0000000..ef01af1
--- /dev/null
+++ b/tests/custom/test_timeouts.py
@@ -0,0 +1,273 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import sys
+from asyncio import TimeoutError, InvalidStateError
+
+import pytest
+
+from pyignite import AioClient
+from pyignite.aio_cache import AioCache
+from pyignite.datatypes.key_value import PeekModes
+from tests.util import start_ignite_gen
+
+
[email protected](scope='module', autouse=True)
+def server1():
+ yield from start_ignite_gen(idx=1)
+
+
[email protected](autouse=True)
+async def proxy(event_loop, server1, cache):
+ proxy = ProxyServer(("127.0.0.1", 10802), ("127.0.0.1", 10801))
+ try:
+ await proxy.start()
+ yield proxy
+ finally:
+ await proxy.close()
+
+
[email protected](scope='module', autouse=True)
+async def cache(server1):
+ c = AioClient(partition_aware=False)
+ async with c.connect("127.0.0.1", 10801):
+ try:
+ cache = await c.get_or_create_cache("test")
+ yield cache
+ finally:
+ await cache.destroy()
+
+
[email protected](autouse=True)
+def invalid_states_errors():
+ errors = []
+
+ def trace(_, event, arg):
+ if event == 'exception':
+ etype, _, _ = arg
+ if etype is InvalidStateError:
+ errors.append(arg)
+
+ return trace
+
+ try:
+ sys.settrace(trace)
+ yield errors
+ finally:
+ sys.settrace(None)
+
+
+cache_method_params = (
+ "method,targs",
+ [
+ (AioCache.get, (1,)),
+ (AioCache.put, (1, 1)),
+ (AioCache.get_all, ([1, 1],)),
+ (AioCache.put_all, ({1: 1},)),
+ (AioCache.replace, (1, 1)),
+ (AioCache.clear, ()),
+ (AioCache.clear_key, (1,)),
+ (AioCache.clear_keys, ([1, 1],)),
+ (AioCache.contains_key, (1,)),
+ (AioCache.contains_keys, ([1, 1],)),
+ (AioCache.get_and_put, (1, 1)),
+ (AioCache.get_and_put_if_absent, (1, 1)),
+ (AioCache.put_if_absent, (1, 1)),
+ (AioCache.get_and_remove, (1,)),
+ (AioCache.get_and_replace, (1, 1)),
+ (AioCache.remove_key, (1,)),
+ (AioCache.remove_keys, ([1, 1],)),
+ (AioCache.remove_all, ()),
+ (AioCache.remove_if_equals, (1, 1)),
+ (AioCache.replace_if_equals, (1, 1, 1)),
+ (AioCache.get_size, ([PeekModes.PRIMARY, PeekModes.BACKUP],)),
+ (AioCache.get_size, ()),
+ (AioCache.settings, ())
+ ]
+)
+
+
[email protected](*cache_method_params)
[email protected]
+async def test_cancellation_on_slow_response(event_loop, proxy,
invalid_states_errors,
+ method, targs):
+ c = AioClient(partition_aware=False)
+ async with c.connect("127.0.0.1", 10802):
+ cache = await c.get_cache("test")
+
+ proxy.slow_response = True
+ with pytest.raises(TimeoutError):
+ await method(cache, *targs, timeout=0.1)
+
+ proxy.slow_response = False
+ assert len(invalid_states_errors) == 0
+
+
[email protected](*cache_method_params)
[email protected]
+async def test_cancellation_on_disconnect(event_loop, proxy,
invalid_states_errors,
+ method, targs):
+ c = AioClient(partition_aware=False)
+ async with c.connect("127.0.0.1", 10802):
+ cache = await c.get_cache("test")
+ proxy.discard_response = True
+
+ result = asyncio.ensure_future(method(cache, *targs, timeout=0.1))
+ await asyncio.sleep(0.2)
+ await proxy.disconnect_peers()
+
+ with pytest.raises(TimeoutError):
+ await result
+
+ assert len(invalid_states_errors) == 0
+
+
+class ProxyServer:
+ """
+ Proxy for simulating discarding response or slow response from ignite
server
+ Set `discard_response` or `slow_response` to `True` to simulate this
condition.
+ Set `slow_response_timeout` to change sleep time interval for slow
responses.
+ Call `disconnect_peers()` in order to simulate lost connection to Ignite
server.
+ """
+ def __init__(self, local_host, remote_host):
+ self.local_host = local_host
+ self.remote_host = remote_host
+ self.peers = {}
+ self.discard_response, self.slow_response = False, False
+ self.slow_response_timeout = 0.1
+ self.server = None
+
+ async def start(self):
+ loop = asyncio.get_event_loop()
+ host, port = self.local_host
+ self.server = await loop.create_server(
+ lambda: ProxyTcpProtocol(self), host=host, port=port)
+
+ async def disconnect_peers(self):
+ peers = dict(self.peers)
+ for k, v in peers.items():
+ if not v:
+ return
+
+ local, remote = v
+ if local:
+ await remote.close()
+ if remote:
+ await local.close()
+
+ async def close(self):
+ try:
+ await self.disconnect_peers()
+ except TimeoutError:
+ pass
+
+ self.server.close()
+
+
+class ProxyTcpProtocol(asyncio.Protocol):
+ def __init__(self, proxy):
+ self.addr, self.port = proxy.remote_host
+ self.proxy = proxy
+ self.transport, self.remote_protocol = None, None
+ self.conn_info, self.close_fut = None, None
+ super().__init__()
+
+ def connection_made(self, transport):
+ self.transport = transport
+ self.conn_info = transport.get_extra_info("peername")
+
+ def data_received(self, data):
+ if self.remote_protocol and self.remote_protocol.transport:
+ self.remote_protocol.transport.write(data)
+ return
+
+ loop = asyncio.get_event_loop()
+ self.remote_protocol = RemoteTcpProtocol(self.proxy, self, data)
+ coro = loop.create_connection(lambda: self.remote_protocol,
host=self.addr, port=self.port)
+ asyncio.ensure_future(coro)
+
+ self.proxy.peers[self.conn_info] = (self, self.remote_protocol)
+
+ async def close(self):
+ if not self.transport:
+ return
+
+ self.close_fut = asyncio.get_event_loop().create_future()
+ self.transport.close()
+
+ try:
+ await asyncio.wait_for(self.close_fut, 0.1)
+ except TimeoutError:
+ pass
+
+ def connection_lost(self, exc):
+ if self.close_fut:
+ self.close_fut.done()
+
+
+class RemoteTcpProtocol(asyncio.Protocol):
+ def __init__(self, proxy, proxy_protocol, data):
+ self.proxy = proxy
+ self.proxy_protocol = proxy_protocol
+ self.data = data
+ self.transport, self.close_fut = None, None
+ self.queue = []
+ self.loop_task = asyncio.ensure_future(self.send_loop())
+ self.stop_flag = False
+ super().__init__()
+
+ def connection_made(self, transport):
+ self.transport = transport
+ self.transport.write(self.data)
+
+ async def close(self):
+ if not self.transport:
+ return
+
+ self.stop_flag = True
+ self.close_fut = asyncio.get_event_loop().create_future()
+ self.transport.close()
+ try:
+ await asyncio.wait_for(self.loop_task, 0.1)
+ except TimeoutError:
+ pass
+
+ try:
+ await asyncio.wait_for(self.close_fut, 0.1)
+ except TimeoutError:
+ pass
+
+ def connection_lost(self, exc):
+ if self.close_fut:
+ self.close_fut.done()
+
+ async def send_loop(self):
+ while not self.stop_flag:
+ if not self.queue:
+ await asyncio.sleep(0.01)
+ continue
+
+ packet = self.queue.pop()
+ if packet:
+ if self.proxy.slow_response:
+ await asyncio.sleep(self.proxy.slow_response_timeout)
+ self.proxy_protocol.transport.write(packet)
+
+ def data_received(self, data):
+ if self.proxy.discard_response:
+ return
+
+ self.queue.append(data)
diff --git a/tests/util.py b/tests/util.py
index af3b70e..6101a09 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -27,12 +27,6 @@ import subprocess
import time
-try:
- from contextlib import asynccontextmanager
-except ImportError:
- from async_generator import asynccontextmanager
-
-
@contextlib.contextmanager
def get_or_create_cache(client, settings):
cache = client.get_or_create_cache(settings)
@@ -42,7 +36,7 @@ def get_or_create_cache(client, settings):
cache.destroy()
-@asynccontextmanager
[email protected]
async def get_or_create_cache_async(client, settings):
cache = await client.get_or_create_cache(settings)
try:
@@ -114,15 +108,6 @@ def get_ignite_runner():
raise Exception(f"Ignite not found. IGNITE_HOME
{os.getenv('IGNITE_HOME')}")
-def get_ignite_config_path(use_ssl=False):
- if use_ssl:
- file_name = "ignite-config-ssl.xml"
- else:
- file_name = "ignite-config.xml.jinja2"
-
- return os.path.join(get_test_dir(), "config", file_name)
-
-
def check_server_started(idx=1):
pattern = re.compile('^Topology snapshot.*')
diff --git a/tox.ini b/tox.ini
index e873e21..d68f02e 100644
--- a/tox.ini
+++ b/tox.ini
@@ -15,7 +15,7 @@
[tox]
skipsdist = True
-envlist = codestyle,py{37,38,39,310}
+envlist = codestyle,py{37,38,39,310,311}
[pytest]
log_format = %(asctime)s %(name)s %(levelname)s %(message)s
@@ -43,6 +43,6 @@ usedevelop = True
commands =
pytest {env:PYTESTARGS:} {posargs} --force-cext --examples
-[testenv:py{36,37,38,39}-jenkins]
+[testenv:py{37,38,39,310,311}-jenkins]
setenv:
PYTESTARGS = --junitxml=junit-{envname}.xml