This is an automated email from the ASF dual-hosted git repository.
absurdfarce pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git
The following commit(s) were added to refs/heads/4.x by this push:
new 8f69c24d6 CASSJAVA-116: Retry or Speculative Execution with
RequestIdGenerator throws "Duplicate Key" patch by Jane He; reviewed by Andy
Tolbert and Lukasz Atoniak for CASSJAVA-116
8f69c24d6 is described below
commit 8f69c24d6cd6db75c6811fc32ee9ab39121ecaa6
Author: janehe <[email protected]>
AuthorDate: Mon Nov 10 12:50:59 2025 -0800
CASSJAVA-116: Retry or Speculative Execution with RequestIdGenerator throws
"Duplicate Key"
patch by Jane He; reviewed by Andy Tolbert and Lukasz Atoniak for
CASSJAVA-116
---
.../api/core/tracker/RequestIdGenerator.java | 29 ++++++----
.../core/cql/CqlRequestHandlerRetryTest.java | 63 ++++++++++++++++++++++
.../core/cql/RequestHandlerTestHarness.java | 10 +++-
.../driver/core/tracker/RequestIdGeneratorIT.java | 21 +++++++-
4 files changed, 110 insertions(+), 13 deletions(-)
diff --git
a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
index 59ac3fdac..21db3793b 100644
---
a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
+++
b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java
@@ -19,20 +19,21 @@ package com.datastax.oss.driver.api.core.tracker;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Request;
-import
com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
/**
* Interface responsible for generating request IDs.
*
- * <p>Note that all request IDs have a parent/child relationship. A "parent
ID" can loosely be
- * thought of as encompassing a sequence of a request + any attendant retries,
speculative
+ * <p>Note that all request IDs have a parent/child relationship. A "session
request ID" can loosely
+ * be thought of as encompassing a sequence of a request + any attendant
retries, speculative
* executions etc. It's scope is identical to that of a {@link
- * com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "request
ID" represents a single
- * request within this larger scope. Note that a request corresponding to a
request ID may be
+ * com.datastax.oss.driver.internal.core.cql.CqlRequestHandler}. A "node
request ID" represents a
+ * single request within this larger scope. Note that a request corresponding
to a request ID may be
* retried; in that case the retry count will be appended to the corresponding
identifier in the
* logs.
*/
@@ -67,11 +68,17 @@ public interface RequestIdGenerator {
default Statement<?> getDecoratedStatement(
@NonNull Statement<?> statement, @NonNull String requestId) {
- Map<String, ByteBuffer> customPayload =
- NullAllowingImmutableMap.<String, ByteBuffer>builder()
- .putAll(statement.getCustomPayload())
- .put(getCustomPayloadKey(),
ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)))
- .build();
- return statement.setCustomPayload(customPayload);
+
+ Map<String, ByteBuffer> existing = new
HashMap<>(statement.getCustomPayload());
+ String key = getCustomPayloadKey();
+
+ // Add or overwrite
+ existing.put(key,
ByteBuffer.wrap(requestId.getBytes(StandardCharsets.UTF_8)));
+
+ // Allowing null key/values
+ // Wrap a map inside to be immutable without instanciating a new map
+ Map<String, ByteBuffer> unmodifiableMap =
Collections.unmodifiableMap(existing);
+
+ return statement.setCustomPayload(unmodifiableMap);
}
}
diff --git
a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerRetryTest.java
b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerRetryTest.java
index bea52891c..ccac873c6 100644
---
a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerRetryTest.java
+++
b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerRetryTest.java
@@ -48,6 +48,8 @@ import
com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException;
import com.datastax.oss.driver.api.core.servererrors.ServerError;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException;
+import com.datastax.oss.driver.api.core.session.Request;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.response.Error;
import com.datastax.oss.protocol.internal.response.error.ReadTimeout;
@@ -55,9 +57,13 @@ import
com.datastax.oss.protocol.internal.response.error.Unavailable;
import com.datastax.oss.protocol.internal.response.error.WriteTimeout;
import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class CqlRequestHandlerRetryTest extends CqlRequestHandlerTestBase {
@@ -384,6 +390,63 @@ public class CqlRequestHandlerRetryTest extends
CqlRequestHandlerTestBase {
}
}
+ @Test
+ @UseDataProvider("failureAndIdempotent")
+ public void
should_not_fail_with_duplicate_key_when_retrying_with_request_id_generator(
+ FailureScenario failureScenario, boolean defaultIdempotence,
Statement<?> statement) {
+
+ // Create a RequestIdGenerator that uses the same key as the statement's
custom payload
+ RequestIdGenerator requestIdGenerator =
+ new RequestIdGenerator() {
+ private AtomicInteger counter = new AtomicInteger(0);
+
+ @Override
+ public String getSessionRequestId() {
+ return "session-123";
+ }
+
+ @Override
+ public String getNodeRequestId(@NonNull Request request, @NonNull
String parentId) {
+ return parentId + "-" + counter.getAndIncrement();
+ }
+ };
+
+ RequestHandlerTestHarness.Builder harnessBuilder =
+ RequestHandlerTestHarness.builder()
+ .withDefaultIdempotence(defaultIdempotence)
+ .withRequestIdGenerator(requestIdGenerator);
+ failureScenario.mockRequestError(harnessBuilder, node1);
+ harnessBuilder.withResponse(node2, defaultFrameOf(singleRow()));
+
+ try (RequestHandlerTestHarness harness = harnessBuilder.build()) {
+ failureScenario.mockRetryPolicyVerdict(
+ harness.getContext().getRetryPolicy(anyString()),
RetryVerdict.RETRY_NEXT);
+
+ CompletionStage<AsyncResultSet> resultSetFuture =
+ new CqlRequestHandler(statement, harness.getSession(),
harness.getContext(), "test")
+ .handle();
+
+ // The test should succeed without throwing a duplicate key exception
+ assertThatStage(resultSetFuture)
+ .isSuccess(
+ resultSet -> {
+ Iterator<Row> rows = resultSet.currentPage().iterator();
+ assertThat(rows.hasNext()).isTrue();
+ assertThat(rows.next().getString("message")).isEqualTo("hello,
world");
+
+ ExecutionInfo executionInfo = resultSet.getExecutionInfo();
+ assertThat(executionInfo.getCoordinator()).isEqualTo(node2);
+ assertThat(executionInfo.getErrors()).hasSize(1);
+
assertThat(executionInfo.getErrors().get(0).getKey()).isEqualTo(node1);
+
+ // Verify that the custom payload still contains the request
ID key
+ // (either the original value or the generated one, depending
on implementation)
+
assertThat(executionInfo.getRequest().getCustomPayload().get("request-id"))
+
.isEqualTo(ByteBuffer.wrap("session-123-1".getBytes(StandardCharsets.UTF_8)));
+ });
+ }
+ }
+
/**
* Sets up the mocks to simulate an error from a node, and make the retry
policy return a given
* decision for that error.
diff --git
a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
index 6ecd61119..6a7657d58 100644
---
a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
+++
b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/RequestHandlerTestHarness.java
@@ -37,6 +37,7 @@ import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import com.datastax.oss.driver.api.core.time.TimestampGenerator;
+import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.internal.core.DefaultConsistencyLevelRegistry;
import com.datastax.oss.driver.internal.core.ProtocolFeature;
@@ -170,7 +171,8 @@ public class RequestHandlerTestHarness implements
AutoCloseable {
when(context.getRequestTracker()).thenReturn(new
NoopRequestTracker(context));
- when(context.getRequestIdGenerator()).thenReturn(Optional.empty());
+ when(context.getRequestIdGenerator())
+ .thenReturn(Optional.ofNullable(builder.requestIdGenerator));
}
public DefaultSession getSession() {
@@ -203,6 +205,7 @@ public class RequestHandlerTestHarness implements
AutoCloseable {
private final List<PoolBehavior> poolBehaviors = new ArrayList<>();
private boolean defaultIdempotence;
private ProtocolVersion protocolVersion;
+ private RequestIdGenerator requestIdGenerator;
/**
* Sets the given node as the next one in the query plan; an empty pool
will be simulated when
@@ -258,6 +261,11 @@ public class RequestHandlerTestHarness implements
AutoCloseable {
return this;
}
+ public Builder withRequestIdGenerator(RequestIdGenerator
requestIdGenerator) {
+ this.requestIdGenerator = requestIdGenerator;
+ return this;
+ }
+
/**
* Sets the given node as the next one in the query plan; the test code is
responsible of
* calling the methods on the returned object to complete the write and
the query.
diff --git
a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
index 2848a8fb6..516a62bb1 100644
---
a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
+++
b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java
@@ -17,12 +17,14 @@
*/
package com.datastax.oss.driver.core.tracker;
+import static com.datastax.oss.driver.Assertions.assertThatStage;
import static org.assertj.core.api.Assertions.assertThat;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator;
@@ -119,7 +121,24 @@ public class RequestIdGeneratorIT {
try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
String query = "SELECT * FROM system.local";
ResultSet rs = session.execute(query);
-
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key")).isNull();
+
assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id")).isNull();
+ }
+ }
+
+ @Test
+ public void should_succeed_with_null_value_in_custom_payload() {
+ DriverConfigLoader loader =
+ SessionUtils.configLoaderBuilder()
+ .withString(
+ DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS,
"W3CContextRequestIdGenerator")
+ .build();
+ try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) {
+ String query = "SELECT * FROM system.local";
+ Map<String, ByteBuffer> customPayload =
+ new NullAllowingImmutableMap.Builder<String,
ByteBuffer>(1).put("my_key", null).build();
+ SimpleStatement statement =
+ SimpleStatement.newInstance(query).setCustomPayload(customPayload);
+ assertThatStage(session.executeAsync(statement)).isSuccess();
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]