This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f0a33d3 CASSSIDECAR-368: Fix request execution continues on wrong
thread (#298)
4f0a33d3 is described below
commit 4f0a33d36c51c7a3c60308aee054271c958327c2
Author: Yifan Cai <[email protected]>
AuthorDate: Wed Dec 10 15:47:01 2025 -0800
CASSSIDECAR-368: Fix request execution continues on wrong thread (#298)
Patch by Yifan Cai; Reviewed by Francisco Guerrero for CASSSIDECAR-368
---
CHANGES.txt | 1 +
CONTRIBUTING.md | 25 ++
docs/vertx-threading-model.md | 136 ++++++++++
.../RoleBasedAuthorizationIntegrationTest.java | 6 +-
.../routes/InvalidateCacheIntegrationTest.java | 16 +-
.../authorization/CachedAuthorizationHandler.java | 43 ++--
.../sidecar/handlers/InvalidateCacheHandler.java | 6 +-
.../cassandra/sidecar/routes/RouteBuilder.java | 8 +-
.../cassandra/sidecar/utils/CacheFactory.java | 24 +-
.../apache/cassandra/sidecar/VertxContextTest.java | 285 +++++++++++++++++++++
.../CachedAuthorizationHandlerTest.java | 34 ++-
.../handlers/InvalidateCacheHandlerTest.java | 7 +-
.../cassandra/sidecar/utils/CacheFactoryTest.java | 12 +-
13 files changed, 533 insertions(+), 70 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index dc96e0fb..7c54aac1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Fix request execution continues on wrong thread (CASSSIDECAR-368)
* CachedAuthorizationHandler should pause and resume request while performing
authZ operations (CASSSIDECAR-367)
* Added system disk information endpoint to Cassandra Sidecar
(CASSSIDECAR-366)
* Adding support for quoted tables and keyspaces in snapshot cleanup
(CASSSIDECAR-388)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index acd18b3f..46e2cffc 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -33,6 +33,7 @@ We warmly welcome and appreciate contributions from the
community.
* [Introducing new APIs](#new-apis)
* [Asynchronous Programming](#async-programming)
* [Thread Pool Model](#thread-pools)
+ * [Vert.x Threading Model](#vertx-threading)
* [One-shot Timers and Periodic Timers](#timers)
* [Guice in Sidecar](#guice)
* [Handler Chaining](#chaining-handlers)
@@ -198,6 +199,30 @@ The `service` worker pool has the name
`sidecar-worker-pool`.
The `internal` worker pool has the name `sidecar-internal-worker-pool`.
+### <a name="vertx-threading"></a>Vert.x Threading Model
+
+**Critical**: Each HTTP request MUST be processed by exactly ONE Vert.x event
loop thread throughout its entire lifecycle.
+
+When integrating external async APIs (Caffeine cache, AWS SDK,
CompletableFuture-based libraries), their callbacks execute on
+arbitrary threads. This violates Vert.x's threading model and can cause race
conditions, data corruption, and intermittent bugs.
+
+**Pattern**: Always capture the Vert.x context at handler entry and restore it
before accessing request state:
+
+```java
+public void handle(RoutingContext context) {
+ Context originCtx = Vertx.currentContext(); // Capture context
+
+ externalAsyncOperation()
+ .onSuccess(result -> {
+ originCtx.runOnContext(v -> { // Restore context
+ context.next();
+ });
+ });
+}
+```
+
+For detailed examples and best practices, see the [Vert.x Threading Model
Guide](docs/vertx-threading-model.md).
+
### <a name="timers"></a>One-shot Timers and Periodic Timers
Use vertx APIs to set one-shot timers and periodic timers. If you need to
execute a one-time operation in the future,
diff --git a/docs/vertx-threading-model.md b/docs/vertx-threading-model.md
new file mode 100644
index 00000000..15624a62
--- /dev/null
+++ b/docs/vertx-threading-model.md
@@ -0,0 +1,136 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+# Vert.x Threading Model Guide
+
+## The Golden Rule
+
+**Each HTTP request MUST be processed by exactly ONE Vert.x event loop thread
throughout its entire lifecycle.**
+
+Vert.x makes optimizations based on this assumption. Violating it breaks
operation ordering and causes race conditions.
+
+## Real Example: CASSSIDECAR-368 SSTable Upload Corruption
+
+### The Bug: Data Loss from Threading Violation
+
+When request processing jumped between event loops, 8 KiB chunks at the
beginning of files were lost:
+
+**Sequence of events (WRONG - operations split across 2 threads):**
+
+```
+T1: Request pause scheduled in event-loop A
+T2: After buffering 8 chunks, connection finally paused (pause is async)
+T3: Shared authFuture completes → continuation runs in event-loop B ← THREAD
JUMP
+T4: Request resume scheduled from event-loop B (queued on event-loop A)
+T5: Event-loop A drains data, but data handler NOT installed yet
+ → eventHandler is null
+ → All chunks DISCARDED (8 KiB multiples missing)
+T6: Event-loop B continues, enters SSTableUploadHandler, attaches data handler
(too late)
+```
+
+**After fix (CORRECT - all operations on same thread):**
+
+```
+T1: Request pause scheduled in event-loop A
+T2: After buffering 8 chunks, connection finally paused
+T3: Shared authFuture completes → continuation scheduled back to event-loop A
← STAYS ON A
+T4: Enter SSTableUploadHandler, pause request (scheduled)
+T5: PipeImpl.to attaches data handler and schedules resume
+T6: Data drains and handler receives all chunks ✓
+```
+
+## Understanding Vert.x Contexts
+
+Most applications don't need tight interactions with a context, but sometimes
it's useful to access them, especially when your application uses another
library that performs a callback on its own thread and you want to execute code
in the original context.
+
+### Accessing the Current Context
+
+You can get the current context in two ways:
+
+```java
+// Option 1: Always returns a context (creates one if needed)
+Context context = vertx.getOrCreateContext();
+
+// Option 2: Returns null if current thread is not associated with a context
+Context context = Vertx.currentContext();
+```
+
+The first method never returns null and will create a context if needed. The
second method might return null if the current thread is not associated with a
context.
+
+### Using Context to Run Code
+
+After obtaining a context, you can use it to run code in that context:
+
+```java
+public void integrateWithExternalSystem(Handler<Event> handler) {
+ // Capture the current context
+ Context context = vertx.getOrCreateContext();
+
+ // Run the event handler on the application context
+ externalSystem.onEvent(event -> {
+ context.runOnContext(v -> handler.handle(event));
+ });
+}
+```
+
+In practice, many Vert.x APIs and third-party libraries are implemented this
way to ensure callbacks execute on the correct event loop thread.
+
+## The Pattern
+
+```java
+public void handle(RoutingContext context) {
+ Context originCtx = Vertx.currentContext(); // 1. Capture context at
handler entry
+
+ externalAsyncOperation()
+ .onSuccess(result -> {
+ originCtx.runOnContext(v -> { // 2. Restore context before touching
request
+ context.next();
+ });
+ })
+ .onFailure(cause -> {
+ originCtx.runOnContext(v -> { // 3. Restore in ALL branches
+ context.fail(500, cause);
+ });
+ });
+}
+```
+
+## When You Need This Pattern
+
+**Required** when handling HTTP requests and integrating external async APIs:
+
+- **Caffeine Cache** with `Future` values - even synchronous `Cache<K,
Future<V>>`
+ - Cached futures complete on different threads depending on when they were
created
+ - Shared futures from concurrent requests can jump threads
+- **AWS SDK** - `CompletableFuture` completes on SDK thread pool
+- **Any `CompletableFuture`-based library** that you use in request handlers
+
+**Not required** for:
+- **Vert.x native async operations** (already maintain context):
`vertx.executeBlocking()`, `vertx.fileSystem().*`, `WebClient`, timers, event
bus
+- **Background tasks** not part of request handling (e.g., periodic
maintenance, metrics collection, async cleanup tasks)
+
+If you're not accessing request state (`RoutingContext`, `HttpServerRequest`,
response), you don't need to dispatch back to the original event-loop thread.
+
+## References
+
+- CASSSIDECAR-368: Fix request execution continues on wrong thread
+- Vert.x Dealing with Contexts:
https://vertx.io/docs/guides/advanced-vertx-guide/#_dealing_with_contexts
+- Vert.x Event Loop Context:
https://vertx.io/docs/guides/advanced-vertx-guide/#_event_loop_context
+- Vert.x Golden Rule: https://vertx.io/docs/vertx-core/java/#golden_rule
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
index 6c43cf4f..7fff03e3 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.AuthenticationException;
-import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@@ -909,8 +909,8 @@ class RoleBasedAuthorizationIntegrationTest extends
SharedClusterSidecarIntegrat
private void invalidateAuthorizationHandlerCaches()
{
CacheFactory factory =
serverWrapper.injector.getInstance(CacheFactory.class);
- AsyncCache<AuthorizationCacheKey, Future<Boolean>> authorizationCache
= factory.endpointAuthorizationCache();
- authorizationCache.synchronous().invalidateAll();
+ Cache<AuthorizationCacheKey, Future<Boolean>> authorizationCache =
factory.endpointAuthorizationCache();
+ authorizationCache.invalidateAll();
}
private void verifyAccess(HttpMethod method, String testRoute, Path
clientKeystorePath, Verifier<HttpResponse<Buffer>> assertions)
diff --git
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
index 3ef21325..71ae2f62 100644
---
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
+++
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.Session;
-import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Cache;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -348,34 +348,34 @@ class InvalidateCacheIntegrationTest extends
SharedClusterSidecarIntegrationTest
void testInvalidateEndpointAuthorizationCache()
{
CacheFactory cacheFactory =
serverWrapper.injector.getInstance(CacheFactory.class);
- AsyncCache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache = cacheFactory.endpointAuthorizationCache();
+ Cache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache = cacheFactory.endpointAuthorizationCache();
// Clear the cache first to ensure clean state
String clearCacheRoute =
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE,
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
verifyAccess(HttpMethod.DELETE, clearCacheRoute,
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
- loopAssert(3, () ->
assertThat(endpointAuthorizationCache.synchronous().asMap()).isEmpty());
+ loopAssert(3, () ->
assertThat(endpointAuthorizationCache.asMap()).isEmpty());
verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, testUserKeystorePath,
assertStatus(HttpResponseStatus.OK));
- loopAssert(3, () ->
assertThat(endpointAuthorizationCache.synchronous().asMap()).isNotEmpty());
+ loopAssert(3, () ->
assertThat(endpointAuthorizationCache.asMap()).isNotEmpty());
// Invalidate cache and verify
String invalidateCacheRoute =
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE,
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
verifyAccess(HttpMethod.DELETE, invalidateCacheRoute,
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
- loopAssert(3, () ->
assertThat(endpointAuthorizationCache.synchronous().asMap()).isEmpty());
+ loopAssert(3, () ->
assertThat(endpointAuthorizationCache.asMap()).isEmpty());
// Re-populate cache with test user and verify test user is back in
cache
verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, testUserKeystorePath,
assertStatus(HttpResponseStatus.OK));
- loopAssert(3, () ->
assertThat(endpointAuthorizationCache.synchronous().asMap()).isNotEmpty());
+ loopAssert(3, () ->
assertThat(endpointAuthorizationCache.asMap()).isNotEmpty());
}
@Test
void testInvalidateEndpointAuthorizationCacheWithKeys()
{
CacheFactory cacheFactory =
serverWrapper.injector.getInstance(CacheFactory.class);
- AsyncCache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache = cacheFactory.endpointAuthorizationCache();
+ Cache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache = cacheFactory.endpointAuthorizationCache();
verifyKeyBasedInvalidationNotSupported(CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME,
- () ->
endpointAuthorizationCache.synchronous().asMap(),
+ () ->
endpointAuthorizationCache.asMap(),
testUserKeystorePath);
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
index 7ab115ec..a703b06d 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
@@ -30,7 +30,8 @@ import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Cache;
+import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.auth.User;
@@ -47,7 +48,6 @@ import
org.apache.cassandra.sidecar.metrics.server.AuthMetrics;
import org.jetbrains.annotations.VisibleForTesting;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static io.vertx.core.Future.fromCompletionStage;
import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
/**
@@ -64,7 +64,7 @@ public class CachedAuthorizationHandler implements
AuthorizationHandler
private final AccessControlConfiguration accessControlConfiguration;
private final AdminIdentityResolver adminIdentityResolver;
private final AuthMetrics authMetrics;
- private final AsyncCache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache;
+ private final Cache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache;
protected final Authorization authorization;
protected final Collection<AuthorizationProvider> authorizationProviders;
@@ -74,7 +74,7 @@ public class CachedAuthorizationHandler implements
AuthorizationHandler
AdminIdentityResolver
adminIdentityResolver,
Authorization authorization,
SidecarMetrics sidecarMetrics,
- AsyncCache<AuthorizationCacheKey,
Future<Boolean>> authorizationCache)
+ Cache<AuthorizationCacheKey,
Future<Boolean>> authorizationCache)
{
this(HANDLER_ID_GEN.getAndIncrement(), accessControlConfiguration,
adminIdentityResolver,
authorization, sidecarMetrics, authorizationCache);
@@ -86,7 +86,7 @@ public class CachedAuthorizationHandler implements
AuthorizationHandler
AdminIdentityResolver
adminIdentityResolver,
Authorization authorization,
SidecarMetrics sidecarMetrics,
- AsyncCache<AuthorizationCacheKey,
Future<Boolean>> authorizationCache)
+ Cache<AuthorizationCacheKey,
Future<Boolean>> authorizationCache)
{
this.authorization = Objects.requireNonNull(authorization,
"authorization cannot be null");
this.authorizationProviders = new ArrayList<>();
@@ -123,22 +123,27 @@ public class CachedAuthorizationHandler implements
AuthorizationHandler
variableHandler.accept(context, authorizationContext);
}
+ Context originCtx = context.vertx().getOrCreateContext();
checkAuthorization(authorizationContext, startTimeNanos)
.onSuccess(ignored -> {
- if (!context.request().isEnded())
- {
- context.request().resume();
- }
- context.next();
+ originCtx.runOnContext(v -> {
+ if (!context.request().isEnded())
+ {
+ context.request().resume();
+ }
+ context.next();
+ });
})
.onFailure(cause -> {
LOGGER.error("Authorization failed for user='{}'", user,
cause);
- // resume as the error handler may allow this request to
become valid again
- if (!context.request().isEnded())
- {
- context.request().resume();
- }
- context.fail(FORBIDDEN.code(), cause);
+ originCtx.runOnContext(v -> {
+ // resume as the error handler may allow this request to
become valid again
+ if (!context.request().isEnded())
+ {
+ context.request().resume();
+ }
+ context.fail(FORBIDDEN.code(), cause);
+ });
});
}
catch (Throwable e)
@@ -176,8 +181,10 @@ public class CachedAuthorizationHandler implements
AuthorizationHandler
}
AuthorizationCacheKey key = AuthorizationCacheKey.create(handlerId,
authorizationContext);
- return fromCompletionStage(authorizationCache.get(key, k ->
authorizeUser(authorizationContext, startTimeNanos)))
- .compose(future -> future);
+ // Cache.get() returns the cached Future<Boolean> directly. This is
simpler than AsyncCache which
+ // would wrap the result in a CompletableFuture, requiring unwrapping
via compose(future -> future).
+ // The synchronous Cache API provides thread-safe deduplication of
concurrent loads for the same key.
+ return authorizationCache.get(key, k ->
authorizeUser(authorizationContext, startTimeNanos));
}
protected Future<Boolean> authorizeUser(AuthorizationContext
authorizationContext, long startTimeNanos)
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
index 5efb14d7..624d74d6 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
@@ -21,7 +21,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
-import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Cache;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.netty.handler.codec.http.HttpResponseStatus;
@@ -63,7 +63,7 @@ public class InvalidateCacheHandler extends
AbstractHandler<InvalidateCacheHandl
private final IdentityToRoleCache identityToRoleCache;
private final RoleAuthorizationsCache roleAuthorizationsCache;
private final SuperUserCache superUserCache;
- private final AsyncCache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache;
+ private final Cache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache;
@Inject
public InvalidateCacheHandler(InstanceMetadataFetcher metadataFetcher,
@@ -126,7 +126,7 @@ public class InvalidateCacheHandler extends
AbstractHandler<InvalidateCacheHandl
throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
"endpoint_authorization_cache does
not support selective key invalidation");
}
- endpointAuthorizationCache.synchronous().invalidateAll();
+ endpointAuthorizationCache.invalidateAll();
break;
default:
throw wrapHttpException(HttpResponseStatus.NOT_FOUND,
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
b/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
index 3b2b31f0..8401ea16 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
@@ -28,7 +28,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
-import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Cache;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpMethod;
@@ -68,7 +68,7 @@ public class RouteBuilder
private final AdminIdentityResolver adminIdentityResolver;
private final AuthorizationParameterValidateHandler
authZParameterValidateHandler;
private final SidecarMetrics sidecarMetrics;
- private final AsyncCache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache;
+ private final Cache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache;
private boolean setBodyHandler;
private boolean accessProtected = true;
private final List<Handler<RoutingContext>> handlers = new ArrayList<>();
@@ -235,14 +235,14 @@ public class RouteBuilder
private final AdminIdentityResolver adminIdentityResolver;
private final AuthorizationParameterValidateHandler
authZParameterValidateHandler;
private final SidecarMetrics sidecarMetrics;
- private final AsyncCache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache;
+ private final Cache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache;
public Factory(AccessControlConfiguration accessControlConfiguration,
AuthorizationProvider authorizationProvider,
AdminIdentityResolver adminIdentityResolver,
AuthorizationParameterValidateHandler
authZParameterValidateHandler,
SidecarMetrics sidecarMetrics,
- AsyncCache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache)
+ Cache<AuthorizationCacheKey, Future<Boolean>>
authorizationCache)
{
this.accessControlConfiguration = accessControlConfiguration;
this.authorizationProvider = authorizationProvider;
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
index 410c45a4..30f248d5 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
@@ -22,7 +22,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
@@ -48,7 +47,7 @@ public class CacheFactory
public static final String ENDPOINT_AUTHORIZATION_CACHE_NAME =
"endpoint_authorization_cache";
private final Cache<SSTableImporter.ImportOptions, Future<Void>>
ssTableImportCache;
- private final AsyncCache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache;
+ private final Cache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache;
@Inject
public CacheFactory(SidecarConfiguration configuration,
@@ -80,7 +79,7 @@ public class CacheFactory
/**
* @return the cache used for authorization requests
*/
- public AsyncCache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache()
+ public Cache<AuthorizationCacheKey, Future<Boolean>>
endpointAuthorizationCache()
{
return endpointAuthorizationCache;
}
@@ -119,15 +118,24 @@ public class CacheFactory
/**
* Initializes the Authorization Cache using the provided {@code
configuration}. We want to create only one
* instance of authorization cache.
+ * <p>
+ * This cache stores {@code Future<Boolean>} values representing
asynchronous authorization check results.
+ * The cache uses Caffeine's thread-safe {@code get(key, mappingFunction)}
which ensures that for concurrent
+ * requests with the same authorization key, only one authorization check
is performed, and all requesters
+ * receive the same Future instance.
+ * <p>
+ * Note: We use a synchronous {@link Cache} rather than {@link
com.github.benmanes.caffeine.cache.AsyncCache}
+ * to avoid double-wrapping of Future values and to maintain better
control over Vert.x context switching
+ * (see CASSSIDECAR-368).
*
* @param sidecarConfiguration the Sidecar configuration
* @param sidecarMetrics the Sidecar metrics registry
* @param ticker the ticker for the cache
- * @return instance of {@link AsyncCache} for caching authorization
requests
+ * @return instance of {@link Cache} for caching authorization requests
with {@code Future<Boolean>} values
*/
- private AsyncCache<AuthorizationCacheKey, Future<Boolean>>
initEndpointAuthorizationCache(SidecarConfiguration sidecarConfiguration,
-
SidecarMetrics sidecarMetrics,
-
Ticker ticker)
+ private Cache<AuthorizationCacheKey, Future<Boolean>>
initEndpointAuthorizationCache(SidecarConfiguration sidecarConfiguration,
+
SidecarMetrics sidecarMetrics,
+
Ticker ticker)
{
if (!sidecarConfiguration.accessControlConfiguration().enabled()
||
!sidecarConfiguration.accessControlConfiguration().permissionCacheConfiguration().enabled())
@@ -149,6 +157,6 @@ public class CacheFactory
permissionCacheConfig.expireAfterAccess().unit())
.maximumSize(permissionCacheConfig.maximumSize())
.recordStats(() ->
sidecarMetrics.server().cache().authorizationCacheMetrics)
- .buildAsync();
+ .build();
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/VertxContextTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/VertxContextTest.java
new file mode 100644
index 00000000..3778c410
--- /dev/null
+++ b/server/src/test/java/org/apache/cassandra/sidecar/VertxContextTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test cases demonstrating the Vert.x threading model, especially thread
switching
+ * with shared futures.
+ * <p>
+ * These tests illustrate the concepts from docs/vertx-threading-model.md
+ */
+@Disabled("Excluded from CI. The test class is for demonstration purpose.
Comment me out to run the tests")
+public class VertxContextTest
+{
+ private Vertx vertx;
+ private ExecutorService externalExecutor;
+
+ @BeforeEach
+ void setUp()
+ {
+ vertx = Vertx.vertx();
+ externalExecutor = Executors.newFixedThreadPool(2);
+ }
+
+ @AfterEach
+ void tearDown()
+ {
+
TestResourceReaper.create().with(vertx).with(externalExecutor::shutdown).close();
+ }
+
+ /**
+ * Demonstrates basic context capture and verification that code executes
+ * on the expected event loop thread.
+ * <p>
+ * This test shows:
+ * - How to capture the current Vert.x context
+ * - How to verify you're running on an event loop thread
+ * - That each event loop has a unique context
+ */
+ @Test
+ void testBasicContextCapture() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Context> capturedContext = new AtomicReference<>();
+ AtomicReference<Thread> capturedThread = new AtomicReference<>();
+
+ vertx.runOnContext(v -> {
+ // Capture context and thread when running on event loop
+ capturedContext.set(Vertx.currentContext());
+ capturedThread.set(Thread.currentThread());
+
+ // Verify we're on an event loop thread
+ assertThat(capturedContext.get()).isNotNull();
+ assertThat(capturedContext.get().isEventLoopContext()).isTrue();
+
assertThat(Thread.currentThread().getName()).startsWith("vert.x-eventloop-thread");
+
+ latch.countDown();
+ });
+
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+ assertThat(capturedContext.get()).isNotNull();
+
assertThat(capturedThread.get().getName()).startsWith("vert.x-eventloop-thread");
+ }
+
+ /**
+ * Demonstrates the BROKEN pattern: thread switching with shared
CompletableFuture.
+ * <p>
+ * This test illustrates the problem from CASSSIDECAR-368:
+ * - Request starts on event-loop-A
+ * - Shared CompletableFuture completes on external thread or different
event loop
+ * - Continuation executes on the completing thread (NOT event-loop-A)
+ * - This violates the Golden Rule and can cause race conditions
+ */
+ @Test
+ void testSharedFutureThreadSwitching() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Thread> originalThread = new AtomicReference<>();
+ AtomicReference<Thread> continuationThread = new AtomicReference<>();
+ AtomicReference<Context> originalContext = new AtomicReference<>();
+ AtomicReference<Context> continuationContext = new AtomicReference<>();
+
+ // Simulate a shared future that multiple requests might access
+ // (like a cached authentication future)
+ CompletableFuture<String> sharedFuture = new CompletableFuture<>();
+
+ // Complete the future from an external thread (simulating external
async operation)
+ externalExecutor.submit(() -> {
+ try
+ {
+ Thread.sleep(50); // Simulate async work
+ sharedFuture.complete("auth-token");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Simulate request handler on event loop A
+ vertx.runOnContext(v -> {
+ originalThread.set(Thread.currentThread());
+ originalContext.set(Vertx.currentContext());
+
+ // BROKEN: Continuation runs on whichever thread completes the
future
+ sharedFuture.thenAccept(token -> {
+ continuationThread.set(Thread.currentThread());
+ continuationContext.set(Vertx.currentContext());
+
+ // This demonstrates the problem: we've switched threads!
+
assertThat(continuationThread.get()).isNotEqualTo(originalThread.get());
+
+ // The context might also be different or null
+ // This is the core issue that caused CASSSIDECAR-368
+ latch.countDown();
+ });
+ });
+
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+
+ // Verify thread switching occurred
+ assertThat(originalThread.get()).isNotNull();
+ assertThat(continuationThread.get()).isNotNull();
+
assertThat(originalThread.get().getName()).startsWith("vert.x-eventloop-thread");
+ // The continuation thread is NOT the same as original
+
assertThat(continuationThread.get()).isNotEqualTo(originalThread.get());
+ }
+
+ /**
+ * Demonstrates the FIXED pattern: proper context restoration with
runOnContext.
+ * <p>
+ * This test shows the correct approach and the core fix for
CASSSIDECAR-368:
+ * - Capture the original context at request entry
+ * - When external async operation completes, dispatch back to original
context
+ * - All request handling code runs on the same event loop thread
+ */
+ @Test
+ void testProperContextRestoration() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Thread> originalThread = new AtomicReference<>();
+ AtomicReference<Thread> restoredThread = new AtomicReference<>();
+ AtomicReference<Context> originalContext = new AtomicReference<>();
+ AtomicReference<Context> restoredContext = new AtomicReference<>();
+
+ // Shared future that completes on external thread
+ CompletableFuture<String> sharedFuture = new CompletableFuture<>();
+
+ externalExecutor.submit(() -> {
+ try
+ {
+ Thread.sleep(50);
+ sharedFuture.complete("auth-token");
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Simulate request handler on event loop
+ vertx.runOnContext(v -> {
+ // Step 1: Capture context at handler entry
+ Context originCtx = Vertx.currentContext();
+ originalThread.set(Thread.currentThread());
+ originalContext.set(originCtx);
+
+ // Step 2: When external async completes, restore context before
touching request
+ sharedFuture.thenAccept(token -> {
+ originCtx.runOnContext(ignored -> {
+ // Now we're back on the original event loop thread
+ restoredThread.set(Thread.currentThread());
+ restoredContext.set(Vertx.currentContext());
+
+ // FIXED: Same thread and context as original
+
assertThat(restoredThread.get()).isEqualTo(originalThread.get());
+
assertThat(restoredContext.get()).isEqualTo(originalContext.get());
+
+ latch.countDown();
+ });
+ });
+ });
+
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+
+ // Verify we stayed on the same thread throughout request handling
+ assertThat(originalThread.get()).isNotNull();
+ assertThat(restoredThread.get()).isNotNull();
+ assertThat(originalThread.get()).isEqualTo(restoredThread.get());
+ assertThat(originalContext.get()).isEqualTo(restoredContext.get());
+ }
+
+ /**
+ * Demonstrates multiple sequential async operations with proper context
restoration.
+ * <p>
+ * This test shows:
+ * - Context must be restored after EACH external async operation
+ * - The pattern works for chained operations
+ * - All request handling code stays on the same event loop thread
+ */
+ @Test
+ void testMultipleAsyncOperationsWithContextRestoration() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Thread> originalThread = new AtomicReference<>();
+ AtomicReference<Thread> afterFirstOp = new AtomicReference<>();
+ AtomicReference<Thread> afterSecondOp = new AtomicReference<>();
+
+ vertx.runOnContext(v -> {
+ Context originCtx = Vertx.currentContext();
+ originalThread.set(Thread.currentThread());
+
+ // First external async operation
+ CompletableFuture<String> firstOp = CompletableFuture.supplyAsync(
+ () -> "first-result",
+ externalExecutor
+ );
+
+ firstOp.thenAccept(firstResult -> {
+ // Restore context after first operation
+ originCtx.runOnContext(ignored1 -> {
+ afterFirstOp.set(Thread.currentThread());
+
+ // Second external async operation
+ CompletableFuture<String> secondOp =
CompletableFuture.supplyAsync(
+ () -> "second-result",
+ externalExecutor
+ );
+
+ secondOp.thenAccept(secondResult -> {
+ // Restore context after second operation
+ originCtx.runOnContext(ignored2 -> {
+ afterSecondOp.set(Thread.currentThread());
+
+ // All operations executed on the same event loop
thread
+
assertThat(afterFirstOp.get()).isEqualTo(originalThread.get());
+
assertThat(afterSecondOp.get()).isEqualTo(originalThread.get());
+
+ latch.countDown();
+ });
+ });
+ });
+ });
+ });
+
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+
+ // Verify all operations stayed on the same thread
+ assertThat(originalThread.get()).isNotNull();
+ assertThat(afterFirstOp.get()).isEqualTo(originalThread.get());
+ assertThat(afterSecondOp.get()).isEqualTo(originalThread.get());
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
index 99c1c482..184f821e 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.ext.auth.User;
import io.vertx.ext.auth.authorization.AndAuthorization;
@@ -33,6 +34,7 @@ import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.auth.authorization.AuthorizationProvider;
import io.vertx.ext.auth.authorization.PermissionBasedAuthorization;
import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.TestResourceReaper;
import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
import
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
@@ -74,10 +76,12 @@ class CachedAuthorizationHandlerTest
private Authorization testAuthorization;
private RouteBuilder.Factory routeBuilderFactory;
private SSTableImporter sstableImporter;
+ private Vertx vertx;
@BeforeEach
void setUp()
{
+ vertx = Vertx.vertx();
mockAccessControlConfig = mock(AccessControlConfiguration.class);
AuthorizationParameterValidateHandler mockValidateHandler =
mock(AuthorizationParameterValidateHandler.class);
mockAdminIdentityResolver = mock(AdminIdentityResolver.class);
@@ -118,6 +122,7 @@ class CachedAuthorizationHandlerTest
@AfterEach
void tearDown()
{
+ TestResourceReaper.create().with(vertx).close();
registry().removeMatching((name, metric) -> true);
}
@@ -201,7 +206,8 @@ class CachedAuthorizationHandlerTest
handler.handle(mockContext2);
}
- verify(mockContext2, times(5)).next();
+ // handle is async in handler. There could be a slight delay on
showing a total of 5 invocations.
+ loopAssert(1, () -> verify(mockContext2, times(5)).next());
// Verify cache hit for mockContext2
CacheStats multipleCallStats =
metrics.server().cache().authorizationCacheMetrics.snapshot();
@@ -496,6 +502,7 @@ class CachedAuthorizationHandlerTest
when(mockServerRequest.pause()).thenReturn(mockServerRequest);
when(mockServerRequest.resume()).thenReturn(mockServerRequest);
when(mockContext.request()).thenReturn(mockServerRequest);
+ when(mockContext.vertx()).thenReturn(vertx);
User mockUser = createMockUser(username, identities, roles);
when(mockContext.user()).thenReturn(mockUser);
@@ -544,7 +551,7 @@ class CachedAuthorizationHandlerTest
if (success)
{
- loopAssert(2, 100, () -> verify(mockContext).next());
+ loopAssert(2, 100, () -> verify(mockContext, times(1)).next());
}
else
{
@@ -556,25 +563,26 @@ class CachedAuthorizationHandlerTest
assertThat(firstCallStats.missCount()).isEqualTo(1);
assertThat(firstCallStats.hitCount()).isEqualTo(0);
- for (int i = 0; i < 5; i++)
+ int totalInvocationCount = 5;
+ for (int invocationCount = 1; invocationCount <= totalInvocationCount;
invocationCount++)
{
// Reset failed state before each subsequent call to allow handler
to process
when(mockContext.failed()).thenReturn(false);
+ int expectedTimes = invocationCount + 1;
handler.handle(mockContext);
- }
-
- if (success)
- {
- loopAssert(2, 100, () -> verify(mockContext, times(6)).next());
- }
- else
- {
- loopAssert(2, 100, () -> verify(mockContext,
times(6)).fail(eq(statusCode), any(Throwable.class)));
+ if (success)
+ {
+ loopAssert(2, 100, () -> verify(mockContext,
times(expectedTimes)).next());
+ }
+ else
+ {
+ loopAssert(2, 100, () -> verify(mockContext,
times(expectedTimes)).fail(eq(statusCode), any(Throwable.class)));
+ }
}
// Verify cache hit on subsequent requests
CacheStats multipleCallStats =
metrics.server().cache().authorizationCacheMetrics.snapshot();
assertThat(multipleCallStats.missCount()).isEqualTo(0);
- assertThat(multipleCallStats.hitCount()).isEqualTo(5);
+
assertThat(multipleCallStats.hitCount()).isEqualTo(totalInvocationCount);
}
}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
index b386232e..89f700cf 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
@@ -30,7 +30,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -79,15 +78,11 @@ public class InvalidateCacheHandlerTest
IdentityToRoleCache mockIdentityToRoleCache =
mock(IdentityToRoleCache.class);
RoleAuthorizationsCache mockRoleAuthorizationsCache =
mock(RoleAuthorizationsCache.class);
SuperUserCache mockSuperUserCache = mock(SuperUserCache.class);
- AsyncCache<AuthorizationCacheKey, Future<Boolean>>
mockEndpointAuthorizationCache = mock(AsyncCache.class);
- Cache<AuthorizationCacheKey, Future<Boolean>> mockSyncCache =
mock(Cache.class);
+ Cache<AuthorizationCacheKey, Future<Boolean>>
mockEndpointAuthorizationCache = mock(Cache.class);
@BeforeEach
void before() throws InterruptedException
{
- // Stub the AsyncCache.synchronous() to return the sync cache mock
-
when(mockEndpointAuthorizationCache.synchronous()).thenReturn(mockSyncCache);
-
Injector injector;
Module testOverride = Modules.override(new TestModule())
.with(new InvalidateCacheTestModule());
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
index f6450300..bc3e1c48 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.sidecar.utils;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -30,7 +29,6 @@ import com.google.common.testing.FakeTicker;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Cache;
import io.vertx.core.Future;
import io.vertx.core.MultiMap;
@@ -217,7 +215,7 @@ class CacheFactoryTest
@Test
void testEndpointAuthorizationCacheExpiration() throws ExecutionException,
InterruptedException
{
- AsyncCache<AuthorizationCacheKey, Future<Boolean>> cache =
cacheFactory.endpointAuthorizationCache();
+ Cache<AuthorizationCacheKey, Future<Boolean>> cache =
cacheFactory.endpointAuthorizationCache();
AuthorizationCacheKey key1
= createAuthorizationCacheKey(1, "user1", List.of("role1"), "ks1",
"tbl1");
AuthorizationCacheKey key2
@@ -260,7 +258,7 @@ class CacheFactoryTest
@Test
void testEndpointAuthorizationCacheDifferentKeys() throws
ExecutionException, InterruptedException
{
- AsyncCache<AuthorizationCacheKey, Future<Boolean>> cache =
cacheFactory.endpointAuthorizationCache();
+ Cache<AuthorizationCacheKey, Future<Boolean>> cache =
cacheFactory.endpointAuthorizationCache();
// Different handler IDs
AuthorizationCacheKey key1
@@ -343,12 +341,12 @@ class CacheFactoryTest
return AuthorizationCacheKey.create(handlerId, authContext);
}
- private Boolean authorizationCacheEntry(AsyncCache<AuthorizationCacheKey,
Future<Boolean>> cache,
+ private Boolean authorizationCacheEntry(Cache<AuthorizationCacheKey,
Future<Boolean>> cache,
AuthorizationCacheKey key,
Boolean value) throws
ExecutionException, InterruptedException
{
- CompletableFuture<Future<Boolean>> future = cache.get(key, k ->
Future.succeededFuture(value));
+ Future<Boolean> future = cache.get(key, k ->
Future.succeededFuture(value));
assertThat(future).isNotNull();
- return future.get().toCompletionStage().toCompletableFuture().get();
+ return future.toCompletionStage().toCompletableFuture().get();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]