This is an automated email from the ASF dual-hosted git repository.
Cole-Greer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/master by this push:
new 86147b9e68 Standardized `gremlin-python` connection options (#3469)
86147b9e68 is described below
commit 86147b9e684ea1508b02d3525996558c22e369d4
Author: Guian Gumpac <[email protected]>
AuthorDate: Fri Jun 26 19:46:51 2026 -0700
Standardized `gremlin-python` connection options (#3469)
# Standardize `gremlin-python` connection options
Implements the Python portion of the TinkerPop 4.x GLV connection-options
standardization. Renames two options to canonical names (old names kept as
deprecated aliases), adds a set of new options, removes two, and raises the
minimum `aiohttp` floor. Python driver changes only; the other GLVs follow in
separate PRs.
**Proposal:**
https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg
## Renames (deprecated aliases retained)
| Old | New | Default |
| ------------- | ----------------- | ------------ |
| `pool_size` | `max_connections` | 128 (was 8) |
| `ssl_options` | `ssl` | - |
The old names still work but emit a `DeprecationWarning` (marked "As of
release 4.0.0, ...") and should be migrated. `max_connections` is now also
applied to the aiohttp `TCPConnector` `limit`, so the transport layer reflects
the option in addition to sizing the connection pool.
## Behavior changes (breaking)
* **`compression`** is a new option defaulting to `'deflate'` (on); the
driver advertises `Accept-Encoding: deflate` by default. Set
`compression='none'` to disable, which also suppresses aiohttp's automatic
`Accept-Encoding` injection so compression is not silently negotiated.
Defaulting on is a deliberate deviation from the proposal's agreed default-off,
applied consistently across the GLVs by later agreement.
* **`connect_timeout`** defaults to 5s. Combined with the existing
`read_timeout`, it is wired into a single aiohttp `ClientTimeout` via the
socket-level `sock_connect`/`sock_read` knobs (not a whole-request `total`), so
long but legitimate streaming responses are not aborted while a stalled server
no longer hangs forever.
* The minimum supported **`aiohttp`** is raised to `3.11` (required for the
`socket_factory` used by `keep_alive_time`).
## New options
* **`connect_timeout`** (5s) - socket-connect timeout (the existing
`read_timeout` was rewired into the unified `ClientTimeout`).
* **`idle_timeout`** (180s) - mapped to the aiohttp `TCPConnector`
keep-alive timeout.
* **`keep_alive_time`** (30s) - enables TCP keep-alive probes via the
connector socket factory (`TCP_KEEPIDLE`/`TCP_KEEPALIVE`, guarded by platform
availability).
* **`compression`** (`'none'`/`'deflate'`, default `'deflate'`) - the wire
compression negotiated with the server.
* **`default_batch_size`** (64) - connection-level default that fills the
per-request `batchSize` when unset.
* **`proxy`** and **`trust_env`** - explicit HTTP proxy and
environment-trust options for the aiohttp transport.
* **`auth.sigv4`** gains a credentials-provider variant accepting an
optional credentials provider or callable, falling back to the AWS environment
variables.
## Removed (breaking)
* **`max_content_length`** - previously accepted but discarded.
* **`headers`** kwarg on `Client`/`DriverRemoteConnection` - custom headers
must now be set via an interceptor.
## Bug fix
* Fixed `Client.submit`/`submit_async` mutating a caller-supplied
`RequestMessage` in place; the message fields are now cloned before applying
`request_options`/`default_batch_size`, so resubmitting the same message no
longer accumulates state (matching the no-mutate contract of the .NET/JS
drivers). This is required for the new `default_batch_size` to fill safely.
## Testing
* `gremlin-python` unit tests pass, including new `test_client_options`,
`test_connection_options`, and `test_transport_compression` suites. The
compression suite uses an in-process aiohttp loopback server to verify real
`Accept-Encoding` negotiation and transparent deflate decompression over a
socket.
* CHANGELOG, reference config table (`gremlin-variants.asciidoc`), and
upgrade guide (`release-4.x.x.asciidoc`) updated for the Python slice.
## Notes
* The minimum `aiohttp` is raised from `3.8` to `3.11` (`pyproject.toml`).
This is a dependency-floor bump, required for the
`TCPConnector(socket_factory=...)` API that backs `keep_alive_time`; users
pinned to `aiohttp` 3.8-3.10 must upgrade.
Assisted-by: Kiro: Claude Opus 4.8
---
CHANGELOG.asciidoc | 16 +-
docs/src/reference/gremlin-variants.asciidoc | 49 +++-
docs/src/upgrade/release-4.x.x.asciidoc | 53 +++-
.../src/main/python/examples/connections.py | 14 +-
.../gremlin_python/driver/aiohttp/transport.py | 186 ++++++++++--
.../src/main/python/gremlin_python/driver/auth.py | 49 +++-
.../main/python/gremlin_python/driver/client.py | 34 ++-
.../python/gremlin_python/driver/connection.py | 7 +-
.../driver/driver_remote_connection.py | 14 +-
gremlin-python/src/main/python/pyproject.toml | 2 +-
.../src/main/python/tests/integration/conftest.py | 4 +-
.../python/tests/integration/driver/test_client.py | 47 +++-
.../integration/driver/test_client_behavior.py | 2 +-
.../test_driver_remote_connection_threaded.py | 2 +-
.../tests/unit/driver/test_client_options.py | 264 +++++++++++++++++
.../tests/unit/driver/test_connection_options.py | 312 +++++++++++++++++++++
16 files changed, 976 insertions(+), 79 deletions(-)
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 31085a634c..d2e0ad55cf 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -25,6 +25,21 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
[[release-4-0-0]]
=== TinkerPop 4.0.0 (Release Date: NOT OFFICIALLY RELEASED YET)
+* Standardized `gremlin-python` connection options per the TinkerPop 4.x GLV
proposal:
+** Renamed `pool_size` to `max_connections` (breaking; the old name has been
removed) and changed the default from 8 to 128; `max_connections` is now also
applied to the aiohttp `TCPConnector` `limit` so the transport layer reflects
the option in addition to sizing the Connection pool.
+** Renamed `ssl_options` to `ssl` accepting an `ssl.SSLContext` (breaking; the
old name has been removed).
+** Added `connect_timeout` (default 5s) and rewired the existing
`read_timeout` into a single aiohttp `ClientTimeout`
(`sock_connect`/`sock_read`); the socket-level knobs are used rather than a
whole-request `total` so long but legitimate streaming responses are not
aborted, while a stalled server no longer hangs forever.
+** Added `idle_timeout` (default 180s) mapped to the aiohttp `TCPConnector`
keep-alive timeout.
+** Added `keep_alive_time` (default 30s) enabling TCP keep-alive probes via
the connector socket factory (`TCP_KEEPIDLE`/`TCP_KEEPALIVE` guarded by
platform availability); this required raising the minimum `aiohttp` to `3.11`
(the `socket_factory` floor). *(breaking)*
+** Each timeout option is named with a millisecond suffix as the primary form
(`connect_timeout_millis`, `idle_timeout_millis`, `read_timeout_millis`,
`keep_alive_time_millis`); the unsuffixed canonical name (`connect_timeout`,
etc.) accepts the idiomatic seconds value. Both resolve to seconds internally
for aiohttp.
+** Added `compression` accepting `'none'`/`'deflate'`, default `'deflate'`
(on), advertising `Accept-Encoding: deflate` by default; when set to `'none'`,
aiohttp's automatic `Accept-Encoding` injection is suppressed so compression is
not silently negotiated.
+** Added `batch_size` (default 64) connection-level default that fills the
per-request `batchSize` when unset.
+** Added explicit HTTP `proxy` and `trust_env` options surfaced on the aiohttp
`ClientSession`.
+** Added a credentials-provider variant to `auth.sigv4` that accepts an
optional credentials provider or callable, falling back to the AWS environment
variables.
+** Removed the `max_content_length` kwarg from the `gremlin-python` driver
(previously accepted but discarded).
+** Removed the standalone `headers` kwarg from `gremlin-python`
`Client`/`DriverRemoteConnection`; custom headers must now be set via
interceptors.
+** Fixed `gremlin-python` `Client.submit`/`submit_async` mutating a
caller-supplied `RequestMessage` in place; the message fields are now cloned
before applying `request_options`/`batch_size`, so resubmitting the same
message no longer accumulates state, matching the no-mutate contract of the
.NET/JS drivers.
+** Fixed `gremlin-python` `Client.submit`/`submit_async` mutating a
caller-supplied `RequestMessage` in place; the message fields are now cloned
before applying `request_options`/`default_batch_size`, so resubmitting the
same message no longer accumulates state, matching the no-mutate contract of
the .NET/JS drivers.
* Standardized `gremlin-driver` (Java) connection options
** Renamed the `connectionSetupTimeoutMillis` builder option to
`connectTimeout` (default lowered from 15s to 5s) and actually wired it to
`ChannelOption.CONNECT_TIMEOUT_MILLIS` on the connection bootstrap; it
previously was validated but never applied. The old name is removed. Settable
as `connectTimeout(Duration)` or `connectTimeoutMillis(int)` (YAML key
`connectionPool.connectTimeoutMillis`). *(breaking)*
** Renamed `maxConnectionPoolSize` to `maxConnections` (default 128),
`idleConnectionTimeoutMillis` to `idleTimeout` (default 180s), and
`resultIterationBatchSize` to `batchSize` (default 64; the connection-level
default that fills a request's per-request `batchSize` when unset, mirroring
how `bulkResults` shares one name across scopes); the old builder methods,
accessors, and YAML keys are removed. *(breaking)*
@@ -61,7 +76,6 @@
image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added `Character`, `Binary`, `Duration` to the `gremlin-lang` grammar, GLVs,
and Translators.
* Refactored `gremlin-python` receive path to stream results directly from the
HTTP response through the configured `response_serializer`.
* Replaced `read()` with `get_stream()` on `AbstractBaseTransport` in
`gremlin-python` for streaming deserialization.
-* Removed `max_content_length` from `gremlin-python` driver (no longer
applicable with streaming deserialization).
* Removed GraphSON parameterized test fixtures from `gremlin-python`
integration tests (GraphBinary only).
* Removed `AbstractBaseTransport`, `AbstractBaseProtocol`, and
`GremlinServerHTTPProtocol` from `gremlin-python`, merging protocol logic into
`Connection`.
* Removed `GraphSONSerializersV4` and the
`gremlin_python.structure.io.graphsonV4` module from `gremlin-python`.
GraphBinary is the only supported wire format.
diff --git a/docs/src/reference/gremlin-variants.asciidoc
b/docs/src/reference/gremlin-variants.asciidoc
index b78f894aa8..3cca5fe43c 100644
--- a/docs/src/reference/gremlin-variants.asciidoc
+++ b/docs/src/reference/gremlin-variants.asciidoc
@@ -3177,13 +3177,15 @@ the "g" provided to the `DriverRemoteConnection`
corresponds to the name of a `G
g =
traversal().with_(DriverRemoteConnection('http://localhost:8182/gremlin','g'))
----
-If you need to send additional headers in the HTTP connection, you can pass an
optional `headers` parameter
-to the `DriverRemoteConnection` constructor.
+If you need to send additional headers on each HTTP request, set them via a
request interceptor rather than a
+connection parameter. Pass a callable to the `interceptors` keyword argument
that mutates the request headers in
+place. See <<gremlin-python-interceptors, RequestInterceptor>> for details.
[source,python]
----
g = traversal().with_(DriverRemoteConnection(
- 'http://localhost:8182/gremlin', 'g', headers={'Header':'Value'}))
+ 'http://localhost:8182/gremlin', 'g',
+ interceptors=[lambda req: req.headers.update({'Header': 'Value'})]))
----
Gremlin-Python contains an `auth` module that provides built-in authentication
functions. These are passed to the
@@ -3207,7 +3209,7 @@ with 'https://'. If Gremlin-Server uses a self-signed
certificate for SSL, Greml
the CA certificate file (in openssl .pem format), to be specified in the
SSL_CERT_FILE environment variable.
NOTE: If connecting from an inherently single-threaded Python process where
blocking while waiting for Gremlin
-traversals to complete is acceptable, it might be helpful to set `pool_size`
and `max_workers` parameters to 1.
+traversals to complete is acceptable, it might be helpful to set
`max_connections` and `max_workers` parameters to 1.
See the <<python-configuration,Configuration>> section just below. Examples
where this could apply are serverless cloud functions or WSGI
worker processes.
@@ -3311,13 +3313,21 @@ can be passed to the `Client` or
`DriverRemoteConnection` instance as keyword ar
[width="100%",cols="3,10,^2",options="header"]
|=========================================================
|Key |Description |Default
-|headers |Additional headers that will be added to each request message.
|`None`
-|max_workers |Maximum number of worker threads. |Number of CPUs * 5
-|request_serializer |The request serializer
implementation.|`gremlin_python.driver.serializer.GraphBinarySerializersV4`
+|max_workers |Maximum number of worker threads. |Same as `max_connections`
(128)
|response_serializer |The response serializer
implementation.|`gremlin_python.driver.serializer.GraphBinarySerializersV4`
-|interceptors |The request interceptors to run after request
serialization.|`None`
+|interceptors |The request interceptors to run before the request body is
serialized.|`None`
|auth |An authentication interceptor. Always appended to the end of the
interceptor list so it runs last. |`None`
-|pool_size |The number of connections used by the pool. |4
+|max_connections |The maximum number of connections used by the pool. |128
+|connect_timeout_millis |Timeout in milliseconds for establishing the
connection (TCP connect plus TLS handshake). Also settable as `connect_timeout`
(in seconds). |5000
+|read_timeout_millis |Per-read idle timeout in milliseconds applied while
streaming a response. Resets per chunk. Also settable as `read_timeout` (in
seconds). |`None`
+|write_timeout |Timeout in seconds for writing a request to the transport.
|`None`
+|ssl |An `ssl.SSLContext` used for TLS connections. |`None`
+|idle_timeout_millis |How long in milliseconds an idle connection remains in
the pool before being closed. Also settable as `idle_timeout` (in seconds).
|180000
+|keep_alive_time_millis |TCP keep-alive idle time in milliseconds before
probes begin on an otherwise idle connection. Also settable as
`keep_alive_time` (in seconds). |30000
+|compression |The wire compression negotiated with the server (`'none'` or
`'deflate'`). |`'deflate'`
+|batch_size |The connection-level default batch size used when a request does
not specify one. |64
+|proxy |HTTP proxy URL routed through the underlying `ClientSession`. |`None`
+|trust_env |Whether to trust environment variables for proxy and SSL
configuration. |False
|enable_user_agent_on_connect |Enables sending a user agent to the server
during connection requests.
More details can be found in provider docs
link:https://tinkerpop.apache.org/docs/x.y.z/dev/provider/#_graph_driver_provider_requirements[here].|True
@@ -3334,10 +3344,29 @@ import ssl
...
g = traversal().with_(
DriverRemoteConnection('https://localhost:8182/gremlin','g',
- ssl_options=ssl.create_default_context(),
+ ssl=ssl.create_default_context(),
read_timeout=30))
----
+Note that no driver timeout bounds the *total* duration of a request once it
is under way. `read_timeout` only bounds
+the gap between response chunks, so a response that keeps producing chunks
will not time out no matter how long it
+runs overall, and there is no client-side "overall" request timeout. If you
need an absolute deadline, impose it in
+your application. `submit_async()` and `ResultSet.all()` return
`concurrent.futures.Future` objects, so pass a
+`timeout` to `result()`:
+
+[source,python]
+----
+from concurrent.futures import TimeoutError
+
+# bound the entire request (submit plus full result iteration) to 30 seconds
+try:
+ result_set = client.submit_async("g.V().out().out()").result(timeout=30)
+ results = result_set.all().result(timeout=30)
+except TimeoutError:
+ # deadline exceeded; stop waiting on the request
+ ...
+----
+
[[gremlin-python-interceptors]]
=== RequestInterceptor
diff --git a/docs/src/upgrade/release-4.x.x.asciidoc
b/docs/src/upgrade/release-4.x.x.asciidoc
index 7bbe5d6438..1558bcd027 100644
--- a/docs/src/upgrade/release-4.x.x.asciidoc
+++ b/docs/src/upgrade/release-4.x.x.asciidoc
@@ -32,6 +32,57 @@ complete list of all the modifications that are part of this
release.
=== Upgrading for Users
+==== Standardizing Python Connection Options
+
+TinkerPop 4.x standardizes connection option names and defaults across the
GLVs. In `gremlin-python`, several
+connection options passed to `Client`/`DriverRemoteConnection` have been
renamed for consistency. Because this is a
+major version, the old names have been removed rather than retained as
aliases, and a number of new options have been
+added. The notes below describe the Python changes. See <<glv-driver-changes,
GLV Driver Changes>> for the equivalent changes in
+the other drivers.
+
+Renames (breaking). The following options have been renamed and the old names
removed. Migrate to the new names:
+
+- `pool_size` is now `max_connections` (default raised from 8 to 128).
`max_connections` is also applied to the
+ aiohttp `TCPConnector` `limit`, so the transport layer reflects the option
in addition to sizing the connection
+ pool.
+- `ssl_options` is now `ssl` (accepts an `ssl.SSLContext`).
+
+Behavior changes. These change runtime behavior on upgrade, even if you do not
change your configuration:
+
+- `compression` is a new option that defaults to `'deflate'` (on), so the
driver advertises `Accept-Encoding: deflate`
+ by default. Set `compression='none'` to disable it; when disabled, aiohttp's
automatic `Accept-Encoding` injection is
+ suppressed so compression is not silently negotiated. Defaulting compression
on is a deliberate deviation from the
+ proposal's agreed default-off, applied consistently across the GLVs by later
agreement.
+- `connect_timeout` now defaults to 5s. Together with `read_timeout` it is
wired into a single aiohttp `ClientTimeout`
+ via the socket-level `sock_connect`/`sock_read` knobs rather than a
whole-request `total`, so long but legitimate
+ streaming responses are not aborted while a stalled server no longer hangs
forever.
+- The minimum supported `aiohttp` has been raised to `3.11` (required for the
`socket_factory` used by
+ `keep_alive_time`). *(breaking)*
+
+New options:
+
+- `connect_timeout` (default 5s): a new socket-connect timeout. It is combined
with the existing `read_timeout`
+ (rewired into a single aiohttp `ClientTimeout` via
`sock_connect`/`sock_read`, as described above).
+ Each timeout option's primary form is the millisecond-suffixed name
(`connect_timeout_millis`, `idle_timeout_millis`,
+ `read_timeout_millis`, `keep_alive_time_millis`); the unsuffixed canonical
name (`connect_timeout`, etc.) accepts the
+ idiomatic seconds value.
+- `idle_timeout` (default 180s): mapped to the aiohttp `TCPConnector`
keep-alive timeout.
+- `keep_alive_time` (default 30s): enables TCP keep-alive probes via the
connector socket factory
+ (`TCP_KEEPIDLE`/`TCP_KEEPALIVE`, guarded by platform availability).
+- `compression` (`'none'`/`'deflate'`, default `'deflate'`): the wire
compression negotiated with the server.
+- `batch_size` (default 64): a connection-level default that fills the
per-request `batchSize` when unset.
+- `proxy` and `trust_env`: explicit HTTP proxy and environment-trust options
surfaced on the aiohttp `ClientSession`.
+- `auth.sigv4` gains a credentials-provider variant that accepts an optional
credentials provider or callable, falling
+ back to the AWS environment variables.
+
+Removals (breaking):
+
+- The `max_content_length` kwarg has been removed (it was previously accepted
but discarded).
+- The standalone `headers` kwarg has been removed from
`Client`/`DriverRemoteConnection`; custom headers must now be
+ set via interceptors.
+
+See:
link:https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg[[DISCUSS]
Standardizing GLV connection options in TinkerPop 4].
+
==== Standardizing Java Connection Options
TinkerPop 4.x standardizes connection option names and defaults across the
GLVs. In the Java reference driver
@@ -406,7 +457,7 @@ from gremlin_python.driver.aiohttp.transport import
AiohttpHTTPTransport
Client(url, 'g', transport_factory=lambda:
AiohttpHTTPTransport(ssl_options=ctx, read_timeout=30))
# After
-Client(url, 'g', ssl_options=ctx, read_timeout=30)
+Client(url, 'g', ssl=ctx, read_timeout=30)
----
The `Connection` constructor signature has changed. The `protocol` and
`transport_factory` positional arguments have
diff --git a/gremlin-python/src/main/python/examples/connections.py
b/gremlin-python/src/main/python/examples/connections.py
index a496d730df..21f8434c86 100644
--- a/gremlin-python/src/main/python/examples/connections.py
+++ b/gremlin-python/src/main/python/examples/connections.py
@@ -65,7 +65,7 @@ def with_auth():
ssl_opts.check_hostname = False
ssl_opts.verify_mode = ssl.CERT_NONE
rc = DriverRemoteConnection(server_url, 'g', auth=basic('stephen',
'password'),
- ssl_options=ssl_opts)
+ ssl=ssl_opts)
else:
rc = DriverRemoteConnection(server_url, 'g', auth=basic('stephen',
'password'))
@@ -80,9 +80,19 @@ def with_auth():
# connecting with customized configurations
def with_configs():
server_url = os.getenv('GREMLIN_SERVER_URL',
'http://localhost:8182/gremlin').format(45940)
+ # Custom headers are no longer a connection kwarg; set them via an
interceptor.
+ def add_headers(request):
+ request.headers['x-custom-header'] = 'example'
+
rc = DriverRemoteConnection(
server_url, 'g',
- headers=None,
+ max_connections=8,
+ connect_timeout_millis=5000,
+ idle_timeout_millis=180000,
+ keep_alive_time_millis=30000,
+ batch_size=64,
+ compression='none',
+ interceptors=[add_headers],
)
g = traversal().with_remote(rc)
diff --git
a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
index bdd100ce24..2367f8aaaa 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/aiohttp/transport.py
@@ -18,6 +18,7 @@
#
import aiohttp
import asyncio
+import socket
import sys
if sys.version_info >= (3, 11):
@@ -27,6 +28,81 @@ else:
__author__ = 'Lyndon Bauto ([email protected])'
+# Default connection option values (canonical TinkerPop 4.x GLV defaults). The
millisecond-suffixed
+# options are the primary form (mirroring the other GLVs); aiohttp itself
works in seconds, so the
+# values are converted internally.
+DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000
+DEFAULT_IDLE_TIMEOUT_MILLIS = 180000
+DEFAULT_KEEP_ALIVE_TIME_MILLIS = 30000
+# Seconds equivalents retained for internal use / backwards reference.
+DEFAULT_CONNECT_TIMEOUT = DEFAULT_CONNECT_TIMEOUT_MILLIS / 1000
+DEFAULT_IDLE_TIMEOUT = DEFAULT_IDLE_TIMEOUT_MILLIS / 1000
+DEFAULT_KEEP_ALIVE_TIME = DEFAULT_KEEP_ALIVE_TIME_MILLIS / 1000
+DEFAULT_COMPRESSION = 'deflate'
+
+def _resolve_timeout_seconds(millis, seconds, default_millis):
+ """Resolve a timeout to seconds (aiohttp's unit) from the ``*_millis``
number or the idiomatic
+ unsuffixed seconds number (``None`` means not supplied for either).
Supplying both raises
+ ``ValueError``; if neither is given, ``default_millis`` is used (``None``
leaves it unset).
+ """
+ if millis is not None and seconds is not None:
+ raise ValueError("provide only one of the milliseconds option or the
seconds option, not both")
+ if seconds is not None:
+ return seconds
+ if millis is not None:
+ return millis / 1000
+ if default_millis is None:
+ return None
+ return default_millis / 1000
+
+
+def _normalize_compression(compression):
+ """Normalize the compression option to a canonical string ('none' or
'deflate').
+
+ Accepts the string forms 'none'/'deflate'.
+ """
+ if compression is None:
+ return DEFAULT_COMPRESSION
+ if isinstance(compression, str):
+ normalized = compression.lower()
+ if normalized in ('none', 'deflate'):
+ return normalized
+ raise ValueError("compression must be one of 'none', 'deflate', got
'%s'" % compression)
+ raise TypeError("compression must be a str ('none'|'deflate'), got %s" %
type(compression).__name__)
+
+
+def _keep_alive_socket_options(keep_alive_time):
+ """Build the list of socket options that enable TCP keep-alive with the
+ given idle time before probes begin. TCP_KEEPIDLE is platform dependent
+ (Linux); macOS exposes the equivalent as TCP_KEEPALIVE. Both are guarded so
+ platforms lacking the option (e.g. Windows) simply enable SO_KEEPALIVE."""
+ options = [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
+ idle_opt = getattr(socket, 'TCP_KEEPIDLE', None)
+ if idle_opt is None:
+ # macOS names the idle-before-probe option TCP_KEEPALIVE
+ idle_opt = getattr(socket, 'TCP_KEEPALIVE', None)
+ if idle_opt is not None:
+ options.append((socket.IPPROTO_TCP, idle_opt, int(keep_alive_time)))
+ return options
+
+
+def _keep_alive_socket_factory(keep_alive_time):
+ """Return a socket_factory (aiohttp >= 3.11) that applies the keep-alive
+ socket options when each connection socket is created."""
+ options = _keep_alive_socket_options(keep_alive_time)
+
+ def factory(addr_info):
+ family, type_, proto, _, _ = addr_info
+ sock = socket.socket(family=family, type=type_, proto=proto)
+ for level, optname, value in options:
+ try:
+ sock.setsockopt(level, optname, value)
+ except (OSError, AttributeError):
+ pass
+ return sock
+
+ return factory
+
class AiohttpSyncStream:
"""Wraps aiohttp's async StreamReader as a synchronous file-like object.
@@ -78,7 +154,13 @@ class AiohttpSyncStream:
class AiohttpHTTPTransport:
nest_asyncio_applied = False
- def __init__(self, call_from_event_loop=None, read_timeout=None,
write_timeout=None, **kwargs):
+ def __init__(self, call_from_event_loop=None, write_timeout=None,
+ connect_timeout_millis=None, connect_timeout=None,
+ idle_timeout_millis=None, idle_timeout=None,
+ read_timeout_millis=None, read_timeout=None,
+ keep_alive_time_millis=None, keep_alive_time=None,
+ compression=DEFAULT_COMPRESSION, proxy=None, trust_env=False,
+ max_connections=None, **kwargs):
if call_from_event_loop is not None and call_from_event_loop and not
AiohttpHTTPTransport.nest_asyncio_applied:
"""
The AiohttpTransport implementation uses the asyncio event
loop. Because of this, it cannot be called
@@ -95,17 +177,36 @@ class AiohttpHTTPTransport:
self._client_session = None
self._http_req_resp = None
self._enable_ssl = False
+ self._ssl_context = None
self._url = None
# Set all inner variables to parameters passed in.
self._aiohttp_kwargs = kwargs
self._write_timeout = write_timeout
- self._read_timeout = read_timeout
- # max_content_length is no longer enforced with streaming
deserialization, but pop it
- # to prevent it from leaking to aiohttp as an unknown kwarg
- self._aiohttp_kwargs.pop("max_content_length", None)
- if "ssl_options" in self._aiohttp_kwargs:
- self._ssl_context = self._aiohttp_kwargs.pop("ssl_options")
+
+ # Timeouts accept a millisecond number (*_millis, primary form) or the
idiomatic seconds number
+ # (unsuffixed). read_timeout defaults off; the others use the
canonical millisecond defaults.
+ self._read_timeout = _resolve_timeout_seconds(read_timeout_millis,
read_timeout, None)
+
+ # Connection-level pooling / lifecycle options.
+ self._connect_timeout =
_resolve_timeout_seconds(connect_timeout_millis, connect_timeout,
DEFAULT_CONNECT_TIMEOUT_MILLIS)
+ self._idle_timeout = _resolve_timeout_seconds(idle_timeout_millis,
idle_timeout, DEFAULT_IDLE_TIMEOUT_MILLIS)
+ self._keep_alive_time =
_resolve_timeout_seconds(keep_alive_time_millis, keep_alive_time,
DEFAULT_KEEP_ALIVE_TIME_MILLIS)
+ # Caps the aiohttp connector's simultaneous connections per Connection
+ # (the Client also sizes its Connection pool by this value).
+ self._max_connections = max_connections
+
+ # Compression negotiation. Default 'deflate' (on); advertises
+ # Accept-Encoding: deflate. Set 'none' to opt out.
+ self._compression = _normalize_compression(compression)
+
+ # HTTP proxy support routed through the ClientSession.
+ self._proxy = proxy
+ self._trust_env = trust_env
+
+ # ssl: canonical name accepting an ssl.SSLContext.
+ if "ssl" in self._aiohttp_kwargs:
+ self._ssl_context = self._aiohttp_kwargs.pop("ssl")
self._enable_ssl = True
def __del__(self):
@@ -117,26 +218,75 @@ class AiohttpHTTPTransport:
self._url = url
# Inner function to perform async connect.
async def async_connect():
- # Start client session and use it to send all HTTP requests.
Headers can be set here.
+ # Build the TCP connector with the standardized pooling / lifecycle
+ # options. keepalive_timeout maps to the idle connection timeout;
+ # the socket factory enables TCP keep-alive probes after
+ # keep_alive_time idle.
+ connector_kwargs = {}
if self._enable_ssl:
- # ssl context is established through tcp connector
- tcp_conn = aiohttp.TCPConnector(ssl_context=self._ssl_context)
- self._client_session =
aiohttp.ClientSession(connector=tcp_conn,
- headers=headers,
loop=self._loop)
- else:
- self._client_session = aiohttp.ClientSession(headers=headers,
loop=self._loop)
+ # ssl context is established through the tcp connector
+ connector_kwargs['ssl_context'] = self._ssl_context
+ if self._idle_timeout is not None:
+ connector_kwargs['keepalive_timeout'] = self._idle_timeout
+ if self._keep_alive_time is not None:
+ self._apply_keep_alive(connector_kwargs)
+ # Reflect max_connections at the aiohttp layer so the connector's
+ # simultaneous-connection limit matches the driver option.
+ if self._max_connections is not None:
+ connector_kwargs['limit'] = self._max_connections
+
+ session_kwargs = {'headers': headers, 'loop': self._loop,
+ 'trust_env': self._trust_env}
+ # Use the per-socket timeouts (sock_connect/sock_read) rather than
a
+ # whole-request total, which would abort long but legitimate
streaming
+ # responses. sock_read bounds idle time between chunks so a stalled
+ # server cannot hang forever.
+ timeout_kwargs = {}
+ if self._connect_timeout is not None:
+ timeout_kwargs['sock_connect'] = self._connect_timeout
+ if self._read_timeout is not None:
+ timeout_kwargs['sock_read'] = self._read_timeout
+ if timeout_kwargs:
+ session_kwargs['timeout'] =
aiohttp.ClientTimeout(**timeout_kwargs)
+ if connector_kwargs:
+ session_kwargs['connector'] =
aiohttp.TCPConnector(**connector_kwargs)
+
+ self._client_session = aiohttp.ClientSession(**session_kwargs)
# Execute the async connect synchronously.
self._loop.run_until_complete(async_connect())
+ def _apply_keep_alive(self, connector_kwargs):
+ """Wire TCP keep-alive into the connector via the aiohttp
socket_factory.
+ The factory sets SO_KEEPALIVE plus the per-socket idle time;
unsupported
+ platforms degrade gracefully inside the factory."""
+ connector_kwargs['socket_factory'] =
_keep_alive_socket_factory(self._keep_alive_time)
+
def write(self, message):
+ # Negotiate compression unless the caller already set Accept-Encoding:
+ # deflate advertises Accept-Encoding: deflate; none suppresses
aiohttp's
+ # auto-injected Accept-Encoding so compression is not silently
negotiated.
+ headers = message['headers']
+ has_accept_encoding = any(k.lower() == 'accept-encoding' for k in
headers)
+ skip_auto_headers = None
+ if self._compression == 'deflate':
+ if not has_accept_encoding:
+ headers['accept-encoding'] = 'deflate'
+ elif not has_accept_encoding:
+ skip_auto_headers = ['Accept-Encoding']
+
# Inner function to perform async write.
async def async_write():
+ post_kwargs = dict(self._aiohttp_kwargs)
+ if self._proxy is not None:
+ post_kwargs['proxy'] = self._proxy
+ if skip_auto_headers is not None:
+ post_kwargs['skip_auto_headers'] = skip_auto_headers
async with async_timeout.timeout(self._write_timeout):
self._http_req_resp = await
self._client_session.post(url=self._url,
data=message['payload'],
-
headers=message['headers'],
-
**self._aiohttp_kwargs)
+
headers=headers,
+
**post_kwargs)
# Execute the async write synchronously.
self._loop.run_until_complete(async_write())
@@ -183,5 +333,5 @@ class AiohttpHTTPTransport:
@property
def closed(self):
- # Connection is closed when client session is closed.
- return self._client_session.closed
+ # Connection is closed when client session is closed (or not yet
created).
+ return self._client_session is None or self._client_session.closed
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/auth.py
b/gremlin-python/src/main/python/gremlin_python/driver/auth.py
index 89d6ccf889..991c5cbaca 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/auth.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/auth.py
@@ -28,30 +28,53 @@ def basic(username, password):
return interceptor
-def sigv4(region, service):
- """Returns an interceptor that signs the request with AWS SigV4."""
+def sigv4(region, service, credentials=None):
+ """Returns an interceptor that signs the request with AWS SigV4.
+
+ By default credentials are sourced from the standard AWS environment
+ variables (``AWS_ACCESS_KEY_ID``, ``AWS_SECRET_ACCESS_KEY`` and the
optional
+ ``AWS_SESSION_TOKEN``). A custom credentials provider may be supplied via
+ ``credentials``; it accepts either:
+
+ * a callable returning a botocore ``Credentials`` object (or any object
with
+ ``access_key``/``secret_key``/``token`` attributes), evaluated per
request, or
+ * a botocore ``Credentials`` object (or ``Session``) used directly.
+
+ When no provider is given, signing falls back to the environment.
+ """
import os
from boto3 import Session
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
+ def _resolve_credentials():
+ if credentials is None:
+ access_key = os.environ.get('AWS_ACCESS_KEY_ID', '')
+ secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', '')
+ session_token = os.environ.get('AWS_SESSION_TOKEN', '')
+ session = Session(
+ aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ aws_session_token=session_token,
+ region_name=region
+ )
+ return session.get_credentials()
+
+ provider = credentials() if callable(credentials) else credentials
+ # A botocore Session exposes get_credentials(); a Credentials object is
+ # already usable as-is.
+ if hasattr(provider, 'get_credentials'):
+ return provider.get_credentials()
+ return provider
+
def interceptor(request):
# Ensure body is serialized so we can sign it
body_bytes = request.serialize_body()
- access_key = os.environ.get('AWS_ACCESS_KEY_ID', '')
- secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY', '')
- session_token = os.environ.get('AWS_SESSION_TOKEN', '')
-
- session = Session(
- aws_access_key_id=access_key,
- aws_secret_access_key=secret_key,
- aws_session_token=session_token,
- region_name=region
- )
+ resolved = _resolve_credentials()
sigv4_request = AWSRequest(method=request.method, url=request.url,
data=body_bytes)
- SigV4Auth(session.get_credentials(), service,
region).add_auth(sigv4_request)
+ SigV4Auth(resolved, service, region).add_auth(sigv4_request)
request.headers.update(dict(sigv4_request.headers))
return interceptor
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/client.py
b/gremlin-python/src/main/python/gremlin_python/driver/client.py
index a32e8f9d0b..7cb073f696 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/client.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/client.py
@@ -38,10 +38,11 @@ __author__ = 'David M. Brown ([email protected]), Lyndon
Bauto (lyndonb@bitqui
class Client:
- def __init__(self, url, traversal_source, pool_size=None, max_workers=None,
+ def __init__(self, url, traversal_source, max_connections=128,
max_workers=None,
response_serializer=None, interceptors=None, auth=None,
- headers=None, enable_user_agent_on_connect=True,
- bulk_results=False, pdt_registry=None, **transport_kwargs):
+ enable_user_agent_on_connect=True,
+ bulk_results=False, pdt_registry=None, batch_size=None,
+ **transport_kwargs):
log.info("Creating Client with url '%s'", url)
self._closed = False
@@ -49,10 +50,12 @@ class Client:
# A raw list is safe here because Python's GIL ensures list.append and
# list.remove are atomic at the bytecode level.
self._tracked_transactions = []
- self._headers = headers
self._enable_user_agent_on_connect = enable_user_agent_on_connect
self._bulk_results = bulk_results
self._traversal_source = traversal_source
+ if batch_size is None:
+ batch_size = 64
+ self._batch_size = batch_size
if response_serializer is None:
response_serializer = serializer.GraphBinarySerializersV4()
if pdt_registry is not None:
@@ -64,14 +67,12 @@ class Client:
self._transport_kwargs = transport_kwargs
- if pool_size is None:
- pool_size = 8
- self._pool_size = pool_size
+ self._max_connections = max_connections
# This is until concurrent.futures backport 3.1.0 release
if max_workers is None:
# If your application is overlapping Gremlin I/O on multiple
threads
# consider passing kwarg max_workers = (cpu_count() or 1) * 5
- max_workers = pool_size
+ max_workers = max_connections
self._executor = ThreadPoolExecutor(max_workers=max_workers)
# Threadsafe queue
self._pool = queue.Queue()
@@ -93,7 +94,7 @@ class Client:
return self._traversal_source
def _fill_pool(self):
- for i in range(self._pool_size):
+ for i in range(self._max_connections):
conn = self._get_connection()
self._pool.put_nowait(conn)
@@ -147,9 +148,9 @@ class Client:
self._executor, self._pool,
response_serializer=self._response_serializer,
auth=self._auth, interceptors=self._interceptors,
- headers=self._headers,
enable_user_agent_on_connect=self._enable_user_agent_on_connect,
bulk_results=self._bulk_results,
+ max_connections=self._max_connections,
**self._transport_kwargs)
def submit(self, message, bindings=None, request_options=None):
@@ -181,6 +182,14 @@ class Client:
if isinstance(message, str):
log.debug("fields='%s', gremlin='%s'", str(fields), str(message))
message = request.RequestMessage(fields=fields, gremlin=message)
+ else:
+ # A caller-supplied RequestMessage must not be mutated in place:
+ # resubmitting the same message (e.g. on retry) would otherwise
+ # accumulate request_options/batchSize from prior submits. Clone
the
+ # fields dict so this submit's mutations stay local, matching the
+ # no-mutate contract of the .NET/JS drivers. Freshly built messages
+ # (the string path above) already own a private fields dict.
+ message = message._replace(fields=dict(message.fields))
conn = self._pool.get(True)
if request_options:
@@ -193,4 +202,9 @@ class Client:
bindings_val =
GremlinLang.convert_parameters_to_string(bindings_val)
message.fields['bindings'] = bindings_val
+ # Fill in the connection-level default batch size when the caller did
+ # not set a per-request batchSize.
+ if self._batch_size is not None and 'batchSize' not in message.fields:
+ message.fields['batchSize'] = self._batch_size
+
return conn.write(message)
diff --git a/gremlin-python/src/main/python/gremlin_python/driver/connection.py
b/gremlin-python/src/main/python/gremlin_python/driver/connection.py
index c9b65eab78..7a46a53a44 100644
--- a/gremlin-python/src/main/python/gremlin_python/driver/connection.py
+++ b/gremlin-python/src/main/python/gremlin_python/driver/connection.py
@@ -37,7 +37,7 @@ class Connection:
def __init__(self, url, traversal_source,
executor, pool,
response_serializer=None, auth=None, interceptors=None,
- headers=None, enable_user_agent_on_connect=True,
+ enable_user_agent_on_connect=True,
bulk_results=False, **transport_kwargs):
if callable(interceptors):
interceptors = [interceptors]
@@ -53,7 +53,10 @@ class Connection:
interceptors = (interceptors or []) + [auth]
self._url = url
- self._headers = headers
+ # Custom request headers are set via interceptors. This internal dict
+ # only carries connection-level headers managed by the driver itself
+ # (user agent, bulkResults) and is established at construction time.
+ self._headers = None
self._traversal_source = traversal_source
self._transport_kwargs = transport_kwargs
self._executor = executor
diff --git
a/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
b/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
index 8c82b3d729..3c0f094569 100644
---
a/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
+++
b/gremlin-python/src/main/python/gremlin_python/driver/driver_remote_connection.py
@@ -32,17 +32,17 @@ __author__ = 'David M. Brown ([email protected]), Lyndon
Bauto (lyndonb@bitqui
class DriverRemoteConnection(RemoteConnection):
def __init__(self, url, traversal_source="g",
- pool_size=None, max_workers=None,
+ max_connections=128, max_workers=None,
response_serializer=None, interceptors=None, auth=None,
- headers=None, enable_user_agent_on_connect=True,
- bulk_results=False, pdt_registry=None, **transport_kwargs):
+ enable_user_agent_on_connect=True,
+ bulk_results=False, pdt_registry=None, batch_size=None,
+ **transport_kwargs):
log.info("Creating DriverRemoteConnection with url '%s'", str(url))
self.__url = url
self.__traversal_source = traversal_source
- self.__pool_size = pool_size
+ self.__max_connections = max_connections
self.__max_workers = max_workers
self.__auth = auth
- self.__headers = headers
self.__enable_user_agent_on_connect = enable_user_agent_on_connect
self.__bulk_results = bulk_results
self.__transport_kwargs = transport_kwargs
@@ -51,14 +51,14 @@ class DriverRemoteConnection(RemoteConnection):
if response_serializer is None:
response_serializer = serializer.GraphBinarySerializersV4()
self._client = client.Client(url, traversal_source,
- pool_size=pool_size,
+ max_connections=max_connections,
max_workers=max_workers,
response_serializer=response_serializer,
interceptors=interceptors, auth=auth,
- headers=headers,
enable_user_agent_on_connect=enable_user_agent_on_connect,
bulk_results=bulk_results,
pdt_registry=pdt_registry,
+ batch_size=batch_size,
**transport_kwargs)
self._url = self._client._url
self._traversal_source = self._client._traversal_source
diff --git a/gremlin-python/src/main/python/pyproject.toml
b/gremlin-python/src/main/python/pyproject.toml
index 8332cb6d9e..8510bc701b 100644
--- a/gremlin-python/src/main/python/pyproject.toml
+++ b/gremlin-python/src/main/python/pyproject.toml
@@ -29,7 +29,7 @@ maintainers = [{name = "Apache TinkerPop", email =
"[email protected]"}]
requires-python = ">=3.10,<3.14"
dependencies = [
"nest_asyncio",
- "aiohttp>=3.8.0,<4.0.0",
+ "aiohttp>=3.11.0,<4.0.0",
"aenum>=1.4.5,<4.0.0",
"isodate>=0.6.0,<1.0.0",
"boto3",
diff --git a/gremlin-python/src/main/python/tests/integration/conftest.py
b/gremlin-python/src/main/python/tests/integration/conftest.py
index 8776859ed2..fb505e8cb8 100644
--- a/gremlin-python/src/main/python/tests/integration/conftest.py
+++ b/gremlin-python/src/main/python/tests/integration/conftest.py
@@ -98,7 +98,7 @@ def authenticated_client(request):
ssl_opts.verify_mode = ssl.CERT_NONE
client = Client(basic_url, 'gmodern',
auth=basic('stephen', 'password'),
- ssl_options=ssl_opts)
+ ssl=ssl_opts)
else:
raise ValueError("Invalid authentication option - " +
request.param)
except OSError:
@@ -155,7 +155,7 @@ def remote_connection_authenticated(request):
ssl_opts.verify_mode = ssl.CERT_NONE
remote_conn = DriverRemoteConnection(basic_url, 'gmodern',
auth=basic('stephen',
'password'),
- ssl_options=ssl_opts)
+ ssl=ssl_opts)
else:
raise ValueError("Invalid authentication option - " +
request.param)
except OSError:
diff --git
a/gremlin-python/src/main/python/tests/integration/driver/test_client.py
b/gremlin-python/src/main/python/tests/integration/driver/test_client.py
index cc972204d0..0981e7737b 100644
--- a/gremlin-python/src/main/python/tests/integration/driver/test_client.py
+++ b/gremlin-python/src/main/python/tests/integration/driver/test_client.py
@@ -70,6 +70,33 @@ def test_client_simple_eval(client):
assert client.submit('g.inject(2)').all().result()[0] == 2
+def test_client_deflate_compression_round_trip():
+ # With compression enabled the driver must (1) advertise Accept-Encoding:
deflate
+ # on the request and (2) transparently decompress the server's
deflate-compressed
+ # response. The interceptor captures a reference to the outgoing headers
dict, which
+ # the transport mutates in place to add Accept-Encoding before sending.
Use a large,
+ # repetitive payload so the response is actually compressed and spans
multiple buffer fills.
+ captured_headers = {}
+
+ def capture(http_request):
+ # keep a reference; the transport adds Accept-Encoding to this same
dict at send time
+ captured_headers['ref'] = http_request.headers
+
+ client = Client(test_no_auth_url, 'g', compression='deflate',
interceptors=capture)
+ try:
+ result = client.submit('[" ".repeat(200000), " ".repeat(100000)]',
+ request_options={'language':
'gremlin-groovy'}).all().result()
+ # (2) decompression succeeded end to end
+ assert len(result[0]) == 200000
+ assert len(result[1]) == 100000
+ # (1) the request advertised deflate
+ sent = captured_headers.get('ref', {})
+ assert sent.get('accept-encoding') == 'deflate', \
+ "expected Accept-Encoding: deflate on the request, got %r" %
sent.get('accept-encoding')
+ finally:
+ client.close()
+
+
def test_client_simple_eval_bindings(client):
assert client.submit('g.V(x).values("age")', {'x': 1}).all().result()[0]
== 29
@@ -121,8 +148,8 @@ def test_bad_serialization(client):
def test_client_connection_pool_after_error(client):
- # Overwrite fixture with pool_size=1 client
- client = Client(test_no_auth_url, 'gmodern', pool_size=1)
+ # Overwrite fixture with max_connections=1 client
+ client = Client(test_no_auth_url, 'gmodern', max_connections=1)
try:
# should fire an exception
@@ -149,7 +176,7 @@ def test_client_no_hang_if_submit_on_closed(client):
def test_client_close_all_connection_in_pool(client):
- client = Client(test_no_auth_url, 'g', pool_size=1)
+ client = Client(test_no_auth_url, 'g', max_connections=1)
assert client.available_pool_size == 1
client.submit('g.inject(4)').all().result()
client.close()
@@ -158,7 +185,7 @@ def test_client_close_all_connection_in_pool(client):
def test_client_side_timeout_set_for_aiohttp(client):
client = Client(test_no_auth_url, 'gmodern',
- read_timeout=1, write_timeout=1)
+ read_timeout_millis=1000, write_timeout=1)
try:
# should fire an exception
@@ -282,8 +309,8 @@ def test_client_async(client):
def test_connection_share(client):
- # Overwrite fixture with pool_size=1 client
- client = Client(test_no_auth_url, 'gmodern', pool_size=1)
+ # Overwrite fixture with max_connections=1 client
+ client = Client(test_no_auth_url, 'gmodern', max_connections=1)
g = GraphTraversalSource(Graph(), TraversalStrategies())
t = g.V()
message = create_basic_request_message(t)
@@ -294,7 +321,7 @@ def test_connection_share(client):
result_set2 = future2.result()
assert len(result_set2.all().result()) == 6
- # This future has to finish for the second to yield result - pool_size=1
+ # This future has to finish for the second to yield result -
max_connections=1
assert future.done()
result_set = future.result()
assert len(result_set.all().result()) == 6
@@ -305,7 +332,7 @@ def test_multi_conn_pool(client):
t = g.V()
message = create_basic_request_message(t)
message2 = create_basic_request_message(t)
- client = Client(test_no_auth_url, 'g', pool_size=1)
+ client = Client(test_no_auth_url, 'g', max_connections=1)
future = client.submit_async(message)
future2 = client.submit_async(message2)
@@ -604,7 +631,7 @@ def
test_auto_serializes_request_message_with_interceptor_mutation():
http_request.body = RequestMessage(fields={"g": "gmodern"},
gremlin="g.inject(99)")
client = Client(test_no_auth_url, 'gmodern',
- pool_size=1, interceptors=swap_query)
+ max_connections=1, interceptors=swap_query)
try:
result = client.submit("g.inject(1)").next()
assert 99 == result
@@ -623,7 +650,7 @@ def test_interceptor_errors_propagate():
raise RuntimeError("interceptor broke")
client = Client(test_no_auth_url, 'gmodern',
- pool_size=1, interceptors=failing_interceptor)
+ max_connections=1, interceptors=failing_interceptor)
try:
# First request should fail with interceptor error
try:
diff --git
a/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
b/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
index 149f2735f4..013280e39f 100644
---
a/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
+++
b/gremlin-python/src/main/python/tests/integration/driver/test_client_behavior.py
@@ -136,7 +136,7 @@ def test_should_handle_slow_response(socket_server_client):
def test_should_timeout_when_server_never_responds():
# Use a short read_timeout so the test fails fast instead of waiting for
# aiohttp's 5-minute default. aiohttp surfaces this as
asyncio.TimeoutError.
- client = Client(url, 'g', read_timeout=2)
+ client = Client(url, 'g', read_timeout_millis=2000)
try:
with pytest.raises(asyncio.TimeoutError):
client.submit(GREMLIN_NO_RESPONSE).all().result()
diff --git
a/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
b/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
index 898872d1f8..d9edf298f0 100644
---
a/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
+++
b/gremlin-python/src/main/python/tests/integration/driver/test_driver_remote_connection_threaded.py
@@ -63,7 +63,7 @@ def _executor(q, conn):
if not conn:
# This isn't a fixture so close manually
close = True
- conn = DriverRemoteConnection(test_no_auth_url, 'gmodern', pool_size=4)
+ conn = DriverRemoteConnection(test_no_auth_url, 'gmodern',
max_connections=4)
try:
g = traversal().with_(conn)
future = g.V().promise()
diff --git
a/gremlin-python/src/main/python/tests/unit/driver/test_client_options.py
b/gremlin-python/src/main/python/tests/unit/driver/test_client_options.py
new file mode 100644
index 0000000000..215891e966
--- /dev/null
+++ b/gremlin-python/src/main/python/tests/unit/driver/test_client_options.py
@@ -0,0 +1,264 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Unit tests for connection-level options surfaced on Client,
+DriverRemoteConnection, and the SigV4 credentials-provider variant."""
+
+import warnings
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from gremlin_python.driver.client import Client
+from gremlin_python.driver.connection import Connection
+from gremlin_python.driver.request import RequestMessage
+
+
+# Patch Connection so Client._fill_pool does not attempt any real connections.
+def _make_client(**kwargs):
+ with patch('gremlin_python.driver.client.connection.Connection',
MagicMock()):
+ return Client('http://localhost:8182/gremlin', 'g', **kwargs)
+
+
+class TestMaxConnections:
+
+ def test_default_is_128(self):
+ client = _make_client()
+ assert client._max_connections == 128
+
+ def test_explicit_max_connections(self):
+ client = _make_client(max_connections=4)
+ assert client._max_connections == 4
+
+
+class TestBatchSize:
+
+ def test_default_is_64(self):
+ client = _make_client()
+ assert client._batch_size == 64
+
+ def test_explicit_value(self):
+ client = _make_client(batch_size=200)
+ assert client._batch_size == 200
+
+ def test_fills_batch_size_when_unset(self):
+ client = _make_client(batch_size=64)
+ conn = MagicMock()
+ client._pool.get = MagicMock(return_value=conn)
+ client.submit_async('g.V()')
+ sent = conn.write.call_args[0][0]
+ assert sent.fields['batchSize'] == 64
+
+ def test_does_not_override_per_request_batch_size(self):
+ client = _make_client(batch_size=64)
+ conn = MagicMock()
+ client._pool.get = MagicMock(return_value=conn)
+ client.submit_async('g.V()', request_options={'batchSize': 10})
+ sent = conn.write.call_args[0][0]
+ assert sent.fields['batchSize'] == 10
+
+
+class TestMaxConnectionsThreadedToTransport:
+
+ def test_max_connections_forwarded_to_connection(self):
+ # Client must pass max_connections through to each Connection so the
+ # aiohttp connector limit can be set, in addition to sizing the pool.
+ captured = {}
+
+ def fake_connection(*args, **kwargs):
+ captured.update(kwargs)
+ return MagicMock()
+
+ with patch('gremlin_python.driver.client.connection.Connection',
+ side_effect=fake_connection):
+ Client('http://localhost:8182/gremlin', 'g', max_connections=5)
+ assert captured.get('max_connections') == 5
+
+ def test_default_max_connections_forwarded_to_connection(self):
+ captured = {}
+
+ def fake_connection(*args, **kwargs):
+ captured.update(kwargs)
+ return MagicMock()
+
+ with patch('gremlin_python.driver.client.connection.Connection',
+ side_effect=fake_connection):
+ Client('http://localhost:8182/gremlin', 'g')
+ assert captured.get('max_connections') == 128
+
+
+class TestRequestMessageNoMutation:
+
+ def test_resubmit_does_not_accumulate_fields(self):
+ # A caller-supplied RequestMessage must not be mutated in place: the
+ # second submit with different options must not see the first submit's
+ # batchSize/request_options leak in.
+ client = _make_client(batch_size=64)
+ conn = MagicMock()
+ client._pool.get = MagicMock(return_value=conn)
+
+ original = RequestMessage(fields={'g': 'g'}, gremlin='g.V()')
+
+ client.submit_async(original, request_options={'evaluationTimeout':
1000})
+ # The caller's original message must be untouched.
+ assert 'batchSize' not in original.fields
+ assert 'evaluationTimeout' not in original.fields
+ assert original.fields == {'g': 'g'}
+
+ # Resubmit the same message with different options; it must not carry
+ # over state from the first submit.
+ client.submit_async(original, request_options={'batchSize': 5})
+ sent = conn.write.call_args[0][0]
+ assert sent.fields['batchSize'] == 5
+ assert 'evaluationTimeout' not in sent.fields
+ # And the original is still pristine.
+ assert original.fields == {'g': 'g'}
+
+ def test_batch_size_not_written_to_caller_message(self):
+ # Use a non-default value so the assertion proves the configured
+ # batch_size flowed through, not the library default (64).
+ client = _make_client(batch_size=32)
+ conn = MagicMock()
+ client._pool.get = MagicMock(return_value=conn)
+ original = RequestMessage(fields={'g': 'g'}, gremlin='g.V()')
+ client.submit_async(original)
+ # default batchSize was applied to the sent clone, not the caller's msg
+ sent = conn.write.call_args[0][0]
+ assert sent.fields['batchSize'] == 32
+ assert 'batchSize' not in original.fields
+
+
+class TestHeadersKwargRemoved:
+
+ def test_client_rejects_headers_kwarg(self):
+ # headers is no longer a named parameter; it lands in transport_kwargs
+ # and would be forwarded to the transport. Verify Client has no
_headers.
+ client = _make_client()
+ assert not hasattr(client, '_headers')
+
+ def test_connection_has_no_headers_param(self):
+ import inspect
+ params = inspect.signature(Connection.__init__).parameters
+ assert 'headers' not in params
+
+ def test_connection_internal_headers_default_none(self):
+ conn = Connection(
+ url='http://localhost:8182/gremlin',
+ traversal_source='g',
+ executor=MagicMock(),
+ pool=MagicMock(),
+ enable_user_agent_on_connect=False,
+ )
+ assert conn._headers is None
+
+
+class TestDriverRemoteConnectionOptions:
+
+ def test_max_connections_forwarded(self):
+ from gremlin_python.driver.driver_remote_connection import
DriverRemoteConnection
+ with
patch('gremlin_python.driver.driver_remote_connection.client.Client') as
MockClient:
+ instance = MockClient.return_value
+ instance._url = 'http://localhost:8182/gremlin'
+ instance._traversal_source = 'g'
+ DriverRemoteConnection('http://localhost:8182/gremlin', 'g',
+ max_connections=16, batch_size=32)
+ _, kwargs = MockClient.call_args
+ assert kwargs['max_connections'] == 16
+ assert kwargs['batch_size'] == 32
+
+ def test_no_headers_param(self):
+ import inspect
+ from gremlin_python.driver.driver_remote_connection import
DriverRemoteConnection
+ params = inspect.signature(DriverRemoteConnection.__init__).parameters
+ assert 'headers' not in params
+ assert 'pool_size' not in params
+ assert 'max_connections' in params
+
+ def test_submit_async_signature_single_arg(self):
+ import inspect
+ from gremlin_python.driver.driver_remote_connection import
DriverRemoteConnection
+ params =
list(inspect.signature(DriverRemoteConnection.submit_async).parameters)
+ # self + gremlin_lang only
+ assert params == ['self', 'gremlin_lang']
+
+
+
+class TestSigV4CredentialsProvider:
+
+ def _fake_request(self):
+ req = MagicMock()
+ req.method = 'POST'
+ req.url = 'http://localhost:8182/gremlin'
+ req.serialize_body.return_value = b'{}'
+ req.headers = {}
+ return req
+
+ def test_callable_credentials_provider_used(self):
+ from gremlin_python.driver.auth import sigv4
+
+ sentinel_creds = object()
+ provider = MagicMock(return_value=sentinel_creds)
+
+ with patch('botocore.auth.SigV4Auth') as MockAuth:
+ signer = MockAuth.return_value
+ signer.add_auth = MagicMock()
+ interceptor = sigv4('us-east-1', 'neptune-db',
credentials=provider)
+ req = self._fake_request()
+ interceptor(req)
+
+ provider.assert_called_once()
+ # SigV4Auth must be constructed with the credentials returned by the
provider
+ args, _ = MockAuth.call_args
+ assert args[0] is sentinel_creds
+
+ def test_credentials_object_with_get_credentials(self):
+ from gremlin_python.driver.auth import sigv4
+
+ resolved = object()
+
+ class FakeSession: # not callable; mimics a botocore Session
+ def get_credentials(self_inner):
+ return resolved
+
+ with patch('botocore.auth.SigV4Auth') as MockAuth:
+ MockAuth.return_value.add_auth = MagicMock()
+ interceptor = sigv4('us-east-1', 'neptune-db',
credentials=FakeSession())
+ interceptor(self._fake_request())
+
+ args, _ = MockAuth.call_args
+ assert args[0] is resolved
+
+ def test_env_fallback_when_no_provider(self):
+ from gremlin_python.driver.auth import sigv4
+
+ resolved = object()
+ with patch('boto3.Session') as MockSession, \
+ patch('botocore.auth.SigV4Auth') as MockAuth:
+ MockSession.return_value.get_credentials.return_value = resolved
+ MockAuth.return_value.add_auth = MagicMock()
+ interceptor = sigv4('us-east-1', 'neptune-db')
+ interceptor(self._fake_request())
+
+ # With no provider supplied, credentials must be resolved from the AWS
+ # environment via a boto3 Session, and those resolved credentials must
be
+ # the ones handed to the SigV4 signer.
+ MockSession.assert_called_once()
+ MockSession.return_value.get_credentials.assert_called_once()
+ args, _ = MockAuth.call_args
+ assert args[0] is resolved
diff --git
a/gremlin-python/src/main/python/tests/unit/driver/test_connection_options.py
b/gremlin-python/src/main/python/tests/unit/driver/test_connection_options.py
new file mode 100644
index 0000000000..4411db936c
--- /dev/null
+++
b/gremlin-python/src/main/python/tests/unit/driver/test_connection_options.py
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""Unit tests for the standardized GLV connection options (TinkerPop 4.x).
+
+These cover option wiring on the aiohttp transport, the Client pool, the
+Connection request path, the DriverRemoteConnection surface, and the SigV4
+credentials-provider variant. They avoid any network I/O.
+"""
+
+import socket
+import warnings
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from gremlin_python.driver.aiohttp.transport import (
+ AiohttpHTTPTransport,
+ _normalize_compression,
+ _keep_alive_socket_options,
+ _keep_alive_socket_factory,
+ DEFAULT_CONNECT_TIMEOUT,
+ DEFAULT_IDLE_TIMEOUT,
+ DEFAULT_KEEP_ALIVE_TIME,
+)
+
+
+# ---------------------------------------------------------------------------
+# Compression normalization
+# ---------------------------------------------------------------------------
+
+class TestCompressionNormalization:
+
+ def test_default_is_deflate(self):
+ assert _normalize_compression(None) == 'deflate'
+
+ def test_string_none(self):
+ assert _normalize_compression('none') == 'none'
+
+ def test_string_deflate(self):
+ assert _normalize_compression('deflate') == 'deflate'
+
+ def test_case_insensitive(self):
+ assert _normalize_compression('DEFLATE') == 'deflate'
+
+ def test_invalid_string_raises(self):
+ with pytest.raises(ValueError):
+ _normalize_compression('gzip')
+
+ def test_invalid_type_raises(self):
+ with pytest.raises(TypeError):
+ _normalize_compression(5)
+
+
+# ---------------------------------------------------------------------------
+# Keep-alive socket option construction
+# ---------------------------------------------------------------------------
+
+class TestKeepAliveSocketOptions:
+
+ def test_includes_so_keepalive(self):
+ opts = _keep_alive_socket_options(30)
+ assert (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) in opts
+
+ def test_includes_idle_when_platform_supports(self):
+ idle_opt = getattr(socket, 'TCP_KEEPIDLE', None) or getattr(socket,
'TCP_KEEPALIVE', None)
+ opts = _keep_alive_socket_options(45)
+ if idle_opt is not None:
+ assert (socket.IPPROTO_TCP, idle_opt, 45) in opts
+ else:
+ # Only SO_KEEPALIVE is present when no idle option exists (e.g.
Windows)
+ assert len(opts) == 1
+
+ def test_socket_factory_applies_options(self):
+ factory = _keep_alive_socket_factory(30)
+ addr_info = (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1',
0))
+ sock = factory(addr_info)
+ try:
+ assert sock.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) != 0
+ finally:
+ sock.close()
+
+
+# ---------------------------------------------------------------------------
+# Transport option wiring
+# ---------------------------------------------------------------------------
+
+class TestTransportDefaults:
+
+ def test_defaults(self):
+ t = AiohttpHTTPTransport()
+ assert t._connect_timeout == DEFAULT_CONNECT_TIMEOUT == 5
+ assert t._idle_timeout == DEFAULT_IDLE_TIMEOUT == 180
+ assert t._keep_alive_time == DEFAULT_KEEP_ALIVE_TIME == 30
+ assert t._compression == 'deflate'
+ assert t._proxy is None
+ assert t._trust_env is False
+ t.close()
+
+ def test_explicit_values(self):
+ t = AiohttpHTTPTransport(connect_timeout_millis=2000,
idle_timeout_millis=60000,
+ keep_alive_time_millis=15000,
compression='deflate',
+ proxy='http://proxy:3128', trust_env=True)
+ assert t._connect_timeout == 2
+ assert t._idle_timeout == 60
+ assert t._keep_alive_time == 15
+ assert t._compression == 'deflate'
+ assert t._proxy == 'http://proxy:3128'
+ assert t._trust_env is True
+ t.close()
+
+ def test_timeouts_via_seconds(self):
+ t = AiohttpHTTPTransport(connect_timeout=2,
+ idle_timeout=60,
+ read_timeout=30,
+ keep_alive_time=15)
+ # the unsuffixed seconds form sets the same internal seconds values as
the *_millis form
+ assert t._connect_timeout == 2
+ assert t._idle_timeout == 60
+ assert t._read_timeout == 30
+ assert t._keep_alive_time == 15
+ t.close()
+
+ def test_timeout_rejects_both_millis_and_seconds(self):
+ with pytest.raises(ValueError):
+ AiohttpHTTPTransport(connect_timeout_millis=2000,
connect_timeout=2)
+
+ def test_ssl_canonical_option(self):
+ import ssl as ssl_module
+ ctx = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_CLIENT)
+ t = AiohttpHTTPTransport(ssl=ctx)
+ assert t._enable_ssl is True
+ assert t._ssl_context is ctx
+ t.close()
+
+ def test_max_content_length_removed(self):
+ # max_content_length is no longer accepted; it would now be forwarded
to
+ # aiohttp as an unknown kwarg. Verify it is not silently stored
anywhere.
+ t = AiohttpHTTPTransport()
+ assert 'max_content_length' not in t._aiohttp_kwargs
+ t.close()
+
+
+class TestTransportConnectWiring:
+
+ def _connect_capture(self, **transport_kwargs):
+ """Construct a transport and capture the kwargs passed to TCPConnector
+ and ClientSession during connect(), without doing real I/O."""
+ t = AiohttpHTTPTransport(**transport_kwargs)
+ captured = {}
+
+ class FakeConnector:
+ def __init__(self, **kwargs):
+ captured['connector'] = kwargs
+
+ class FakeSession:
+ def __init__(self, **kwargs):
+ captured['session'] = kwargs
+
+ with patch('aiohttp.TCPConnector', FakeConnector), \
+ patch('aiohttp.ClientSession', FakeSession):
+ t.connect('http://localhost:8182/gremlin', headers={'a': 'b'})
+ t._client_session = None # FakeSession isn't a real session
+ return captured, t
+
+ def test_idle_timeout_maps_to_keepalive_timeout(self):
+ captured, t = self._connect_capture(idle_timeout_millis=90000)
+ assert captured['connector']['keepalive_timeout'] == 90
+ t.close()
+
+ def test_connect_timeout_sets_sock_connect(self):
+ captured, t = self._connect_capture(connect_timeout_millis=3000)
+ timeout = captured['session']['timeout']
+ assert timeout.sock_connect == 3
+ t.close()
+
+ def test_keep_alive_wires_socket_factory(self):
+ captured, t = self._connect_capture(keep_alive_time_millis=30000)
+ # aiohttp >= 3.11 is the declared floor, so socket_factory is always
used.
+ assert 'socket_factory' in captured['connector']
+ assert 'socket_options' not in captured['connector']
+ t.close()
+
+ def test_max_connections_sets_connector_limit(self):
+ captured, t = self._connect_capture(max_connections=7)
+ assert captured['connector']['limit'] == 7
+ t.close()
+
+ def test_no_max_connections_leaves_limit_unset(self):
+ captured, t = self._connect_capture()
+ assert 'limit' not in captured['connector']
+ t.close()
+
+ def test_read_timeout_sets_sock_read(self):
+ captured, t = self._connect_capture(connect_timeout_millis=3000,
read_timeout_millis=11000)
+ timeout = captured['session']['timeout']
+ assert timeout.sock_connect == 3
+ assert timeout.sock_read == 11
+ t.close()
+
+ def test_default_timeout_has_no_unbounded_total(self):
+ # Defaults must still arm a socket-connect bound; building
ClientTimeout
+ # from the socket knobs (not a whole-request total) keeps streaming
safe.
+ captured, t = self._connect_capture()
+ timeout = captured['session']['timeout']
+ assert timeout.sock_connect == 5
+ assert timeout.total is None
+ t.close()
+
+ def test_trust_env_passed_to_session(self):
+ captured, t = self._connect_capture(trust_env=True)
+ assert captured['session']['trust_env'] is True
+ t.close()
+
+ def test_ssl_context_passed_to_connector(self):
+ import ssl as ssl_module
+ ctx = ssl_module.SSLContext(ssl_module.PROTOCOL_TLS_CLIENT)
+ captured, t = self._connect_capture(ssl=ctx)
+ assert captured['connector']['ssl_context'] is ctx
+ t.close()
+
+
+class TestTransportWriteCompression:
+
+ def _make_transport(self, **kwargs):
+ t = AiohttpHTTPTransport(**kwargs)
+ t._url = 'http://localhost:8182/gremlin'
+
+ captured = {}
+
+ async def fake_post(url, data, headers, **post_kwargs):
+ captured['headers'] = headers
+ captured['post_kwargs'] = post_kwargs
+ return MagicMock()
+
+ session = MagicMock()
+ session.post = fake_post
+ t._client_session = session
+ return t, captured
+
+ def test_default_offers_deflate(self):
+ # With the default compression ('deflate'), the transport advertises
+ # Accept-Encoding: deflate and does not skip the auto-header.
+ t, captured = self._make_transport()
+ t.write({'headers': {'accept': 'x'}, 'payload': b'{}'})
+ assert captured['headers'].get('accept-encoding') == 'deflate'
+ assert 'skip_auto_headers' not in captured['post_kwargs']
+ t._client_session = None
+ t.close()
+
+ def test_deflate_sets_accept_encoding(self):
+ t, captured = self._make_transport(compression='deflate')
+ t.write({'headers': {'accept': 'x'}, 'payload': b'{}'})
+ assert captured['headers'].get('accept-encoding') == 'deflate'
+ # deflate must NOT skip the Accept-Encoding auto-header
+ assert 'skip_auto_headers' not in captured['post_kwargs']
+ t._client_session = None
+ t.close()
+
+ def test_none_suppresses_auto_accept_encoding(self):
+ # compression='none' must stop aiohttp from auto-injecting
+ # Accept-Encoding (gzip, deflate, ...), which would otherwise silently
+ # negotiate compression. No explicit header is added; instead the
+ # auto-header is skipped.
+ t, captured = self._make_transport(compression='none')
+ t.write({'headers': {'accept': 'x'}, 'payload': b'{}'})
+ assert 'accept-encoding' not in captured['headers']
+ assert captured['post_kwargs'].get('skip_auto_headers') ==
['Accept-Encoding']
+ t._client_session = None
+ t.close()
+
+ def test_none_respects_explicit_accept_encoding(self):
+ # A caller/interceptor that explicitly sets Accept-Encoding is honored,
+ # and the auto-header skip is not applied so their value is sent.
+ t, captured = self._make_transport(compression='none')
+ t.write({'headers': {'Accept-Encoding': 'gzip'}, 'payload': b'{}'})
+ assert captured['headers']['Accept-Encoding'] == 'gzip'
+ assert 'skip_auto_headers' not in captured['post_kwargs']
+ t._client_session = None
+ t.close()
+
+ def test_deflate_respects_existing_header(self):
+ t, captured = self._make_transport(compression='deflate')
+ t.write({'headers': {'Accept-Encoding': 'gzip'}, 'payload': b'{}'})
+ assert captured['headers']['Accept-Encoding'] == 'gzip'
+ assert 'accept-encoding' not in captured['headers']
+ assert 'skip_auto_headers' not in captured['post_kwargs']
+ t._client_session = None
+ t.close()
+
+ def test_proxy_passed_to_post(self):
+ t, captured = self._make_transport(proxy='http://proxy:3128')
+ t.write({'headers': {}, 'payload': b'{}'})
+ assert captured['post_kwargs'].get('proxy') == 'http://proxy:3128'
+ t._client_session = None
+ t.close()