This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 9cc68b2 Add randomized 25-index DynamoDB Query/Scan parity test
against eventually-consistent indexes
9cc68b2 is described below
commit 9cc68b2e7a7477c9621662072f4c78f06a9416e2
Author: Aditya Auradkar <[email protected]>
AuthorDate: Thu May 14 14:06:28 2026 -0700
Add randomized 25-index DynamoDB Query/Scan parity test against
eventually-consistent indexes
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../phoenix/ddb/RandomizedIndexParityIT.java | 624 +++++++++++++++++++++
.../java/org/apache/phoenix/ddb/TestUtils.java | 1 +
2 files changed, 625 insertions(+)
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/RandomizedIndexParityIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/RandomizedIndexParityIT.java
new file mode 100644
index 0000000..bbd71c7
--- /dev/null
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/RandomizedIndexParityIT.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.ddb.rest.RESTServer;
+import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.util.ServerUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
+import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+/**
+ * Randomized parity test that exercises the DynamoDB per-table index limit
+ * (20 GSI + 5 LSI = 25 indexes) under mixed CRUD load and verifies Phoenix
+ * matches LocalDynamoDB on Query and Scan against random indexes.
+ *
+ * Tunable via system properties:
+ * -DrandomIndexParity.scale=small|full (default full: 15k items, 5+5 read
rounds)
+ * -DrandomIndexParity.seed=<long> (default 42; logged on test start)
+ */
+public class RandomizedIndexParityIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RandomizedIndexParityIT.class);
+
+ private static final int NUM_GSI = 20;
+ private static final int NUM_LSI = 5;
+ private static final int BATCH_SIZE = 25;
+
+ private final DynamoDbClient dynamoDbClient =
+ LocalDynamoDbTestBase.localDynamoDb().createV2Client();
+ private static DynamoDbClient phoenixDBClientV2;
+
+ private static HBaseTestingUtility utility = null;
+ private static String tmpDir;
+ private static RESTServer restServer = null;
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ /**
+ * Brings up LocalDynamoDB, an HBase mini-cluster, and the REST bridge so
that both
+ * clients accept identical SDK calls. {@code dynamoDbClient} talks
straight to
+ * LocalDDB; {@code phoenixDBClientV2} talks to LocalDDB's wire protocol
served by
+ * Phoenix-on-HBase via the REST shim — the parity asserts compare these
two.
+ */
+ @BeforeClass
+ public static void initialize() throws Exception {
+ tmpDir = System.getProperty("java.io.tmpdir");
+ LocalDynamoDbTestBase.localDynamoDb().start();
+ Configuration conf = TestUtils.getConfigForMiniCluster();
+ utility = new HBaseTestingUtility(conf);
+ setUpConfigForMiniCluster(conf);
+
+ utility.startMiniCluster();
+ DriverManager.registerDriver(new PhoenixTestDriver());
+
+ restServer = new RESTServer(utility.getConfiguration());
+ restServer.run();
+
+ LOGGER.info("started {} on port {}", restServer.getClass().getName(),
restServer.getPort());
+ phoenixDBClientV2 = LocalDynamoDB.createV2Client("http://" +
restServer.getServerAddress());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LocalDynamoDbTestBase.localDynamoDb().stop();
+ if (restServer != null) {
+ restServer.stop();
+ }
+ ServerUtil.ConnectionFactory.shutdown();
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ if (utility != null) {
+ utility.shutdownMiniCluster();
+ }
+ ServerMetadataCacheTestImpl.resetCache();
+ }
+ System.setProperty("java.io.tmpdir", tmpDir);
+ }
+
+ // 30 min: small scale runs in ~2.5 min; full scale (15k items) plus
index-convergence
+ // retries (up to 6 attempts × 25 s backoff per round × 5+5 rounds) can
approach this cap.
+ @Test(timeout = 1800000)
+ public void randomCrudParityAcross25Indexes() throws Exception {
+ long seed =
Long.parseLong(System.getProperty("randomIndexParity.seed", "42"));
+ String scale = System.getProperty("randomIndexParity.scale", "full");
+ int numItems;
+ int numQueryRounds;
+ int numScanRounds;
+ if ("full".equalsIgnoreCase(scale)) {
+ numItems = 15000;
+ numQueryRounds = 5;
+ numScanRounds = 5;
+ } else {
+ numItems = 2000;
+ numQueryRounds = 3;
+ numScanRounds = 3;
+ }
+ LOGGER.info("RandomizedIndexParityIT seed={} scale={} numItems={}
queryRounds={} scanRounds={}",
+ seed, scale, numItems, numQueryRounds, numScanRounds);
+
+ Random rng = new Random(seed);
+ String tableName = ("RIP_" +
testName.getMethodName()).toUpperCase(Locale.ROOT);
+
+ createTableWith25Indexes(tableName);
+
+ // Phoenix's default UPDATE_CACHE_FREQUENCY is 60s; sleep past it so
the
+ // next client connection re-fetches the freshly-created indexes'
metadata.
+ Thread.sleep(61000);
+
+ // Tracker of currently-live (pk, sk) keys. LinkedHashSet for
deterministic
+ // iteration order — combined with the seeded RNG, this makes sample()
pick
+ // the same key on every run for a given seed, so failures are
reproducible.
+ Set<KeyPair> liveKeys = new LinkedHashSet<>();
+
+ bulkSeed(tableName, numItems, rng, liveKeys);
+
+ int crudOps = numItems / 4;
+ randomCrudPass(tableName, crudOps, rng, liveKeys);
+
+ LOGGER.info("Workload complete; live keys = {}. Waiting for index
consistency...",
+ liveKeys.size());
+ TestUtils.waitForEventualConsistentIndex();
+ TestUtils.waitForEventualConsistentIndex();
+
+ runQueryCampaign(tableName, numQueryRounds, rng);
+ runScanCampaign(tableName, numScanRounds, rng);
+ }
+
+ // ---------- table & schema
-----------------------------------------------
+
+ /**
+ * Creates the test table with the DynamoDB per-table maximum: 20 GSIs + 5
LSIs.
+ * GSI sort-key types alternate S/N (and LSI alternates N/S) to cover both
scalar
+ * encodings on the index key path. After both clients return, asserts the
+ * resulting table descriptions are identical so we know the schema landed
+ * intact on both sides before any reads run.
+ */
+ private void createTableWith25Indexes(String tableName) throws Exception {
+ CreateTableRequest req = DDLTestUtils.getCreateTableRequest(
+ tableName, "pk", ScalarAttributeType.S, "sk",
ScalarAttributeType.N);
+
+ for (int i = 0; i < NUM_GSI; i++) {
+ ScalarAttributeType skType =
+ (i % 2 == 0) ? ScalarAttributeType.S :
ScalarAttributeType.N;
+ req = DDLTestUtils.addIndexToRequest(true, req, gsiName(i),
+ gsiPk(i), ScalarAttributeType.S, gsiSk(i), skType);
+ }
+ for (int i = 0; i < NUM_LSI; i++) {
+ ScalarAttributeType skType =
+ (i % 2 == 0) ? ScalarAttributeType.N :
ScalarAttributeType.S;
+ req = DDLTestUtils.addIndexToRequest(false, req, lsiName(i),
+ "pk", ScalarAttributeType.S, lsiSk(i), skType);
+ }
+
+ dynamoDbClient.createTable(req);
+ phoenixDBClientV2.createTable(req);
+
+ DescribeTableRequest dtr =
DescribeTableRequest.builder().tableName(tableName).build();
+ DDLTestUtils.assertTableDescriptions(
+ dynamoDbClient.describeTable(dtr).table(),
+ phoenixDBClientV2.describeTable(dtr).table());
+ LOGGER.info("Created {} with {} GSIs and {} LSIs on both clients",
+ tableName, NUM_GSI, NUM_LSI);
+ }
+
+ // ---------- workload
-----------------------------------------------------
+
+ /**
+ * Initial population. Items are written to both clients via {@link
+ * software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest}
in batches
+ * of {@link #BATCH_SIZE} (DynamoDB's per-request maximum) so seeding ~2k
items
+ * stays well under a minute. {@code liveKeys} is updated in lockstep to
track
+ * what exists on both sides for later update/delete ops.
+ */
+ private void bulkSeed(String tableName, int numItems, Random rng,
Set<KeyPair> liveKeys) {
+ List<WriteRequest> batch = new ArrayList<>(BATCH_SIZE);
+ for (int i = 0; i < numItems; i++) {
+ KeyPair key = newUniqueKey(rng, liveKeys);
+ Map<String, AttributeValue> item = buildItem(key, rng);
+ batch.add(WriteRequest.builder()
+ .putRequest(PutRequest.builder().item(item).build())
+ .build());
+ liveKeys.add(key);
+ if (batch.size() == BATCH_SIZE) {
+ executeBatch(tableName, batch);
+ batch.clear();
+ }
+ }
+ if (!batch.isEmpty()) {
+ executeBatch(tableName, batch);
+ }
+ LOGGER.info("Seeded {} items via BatchWriteItem", numItems);
+ }
+
+ /**
+ * Drives an even mix of Put / Update / Delete chosen by the seeded RNG
(Put
+ * is forced when the live set is empty, but with seeded data that's rare).
+ * Every op is sent to both clients in the same order, and {@code liveKeys}
+ * is updated only after both sides have accepted the op — by construction
it
+ * mirrors what's on disk on both sides. Any divergence the parity asserts
+ * later catch is a real bug, not test noise. Updates target only GSI keys;
+ * LSI sort keys are not updatable in DynamoDB.
+ */
+ private void randomCrudPass(String tableName, int ops, Random rng,
Set<KeyPair> liveKeys) {
+ int puts = 0, updates = 0, deletes = 0;
+ for (int i = 0; i < ops; i++) {
+ int roll = rng.nextInt(3);
+ if (roll == 0 || liveKeys.isEmpty()) {
+ KeyPair key = newUniqueKey(rng, liveKeys);
+ Map<String, AttributeValue> item = buildItem(key, rng);
+ PutItemRequest put = PutItemRequest.builder()
+ .tableName(tableName).item(item).build();
+ phoenixDBClientV2.putItem(put);
+ dynamoDbClient.putItem(put);
+ liveKeys.add(key);
+ puts++;
+ } else if (roll == 1) {
+ KeyPair key = sample(liveKeys, rng);
+ UpdateItemRequest update = buildRandomUpdate(tableName, key,
rng);
+ phoenixDBClientV2.updateItem(update);
+ dynamoDbClient.updateItem(update);
+ updates++;
+ } else {
+ KeyPair key = sample(liveKeys, rng);
+ DeleteItemRequest del = DeleteItemRequest.builder()
+ .tableName(tableName)
+ .key(key.toKeyMap())
+ .build();
+ phoenixDBClientV2.deleteItem(del);
+ dynamoDbClient.deleteItem(del);
+ liveKeys.remove(key);
+ deletes++;
+ }
+ }
+ LOGGER.info("Random CRUD pass: puts={} updates={} deletes={}
liveAfter={}",
+ puts, updates, deletes, liveKeys.size());
+ }
+
+ /**
+ * Builds a randomized {@link UpdateItemRequest} that mutates 1-3 GSI key
pairs
+ * (forcing those GSIs to reindex the row) plus the non-indexed payload.
LSI
+ * sort keys are deliberately untouched — DynamoDB rejects updates that
change
+ * an LSI sort key, and we want both clients to accept the same request.
+ */
+ private UpdateItemRequest buildRandomUpdate(String tableName, KeyPair key,
Random rng) {
+ Map<String, String> names = new HashMap<>();
+ Map<String, AttributeValue> values = new HashMap<>();
+ StringBuilder set = new StringBuilder("SET ");
+ int mutations = 1 + rng.nextInt(3);
+ Set<Integer> chosen = new HashSet<>();
+ for (int m = 0; m < mutations; m++) {
+ int idx;
+ do {
+ idx = rng.nextInt(NUM_GSI);
+ } while (!chosen.add(idx));
+ String pkAttr = gsiPk(idx);
+ String skAttr = gsiSk(idx);
+ String pkPh = "#gpk" + idx, pkVp = ":gpk" + idx;
+ String skPh = "#gsk" + idx, skVp = ":gsk" + idx;
+ names.put(pkPh, pkAttr);
+ names.put(skPh, skAttr);
+ values.put(pkVp, AttributeValue.builder().s(randStr(rng, "g" +
idx, 6)).build());
+ values.put(skVp,
+ (idx % 2 == 0)
+ ? AttributeValue.builder().s(randStr(rng, "s",
4)).build()
+ :
AttributeValue.builder().n(Integer.toString(rng.nextInt(10000))).build());
+ if (m > 0) set.append(", ");
+ set.append(pkPh).append(" = ").append(pkVp)
+ .append(", ").append(skPh).append(" = ").append(skVp);
+ }
+ // payload always updated
+ names.put("#p", "payload");
+ values.put(":p", AttributeValue.builder().s(randStr(rng, "p",
12)).build());
+ set.append(", #p = :p");
+
+ return UpdateItemRequest.builder()
+ .tableName(tableName)
+ .key(key.toKeyMap())
+ .updateExpression(set.toString())
+ .expressionAttributeNames(names)
+ .expressionAttributeValues(values)
+ .build();
+ }
+
+ // ---------- read campaigns ----------------------------------------------
+
+ /**
+ * For each round, picks a random index (GSI or LSI weighted by count),
probes
+ * the base table for an item that has the index hash-key populated so the
+ * Query has a real target, and then asserts Phoenix and LocalDDB return
the
+ * same items. Randomly toggles {@code scanIndexForward} and a payload
+ * {@code FilterExpression} to widen the surface area covered per run.
+ */
+ private void runQueryCampaign(String tableName, int rounds, Random rng) {
+ for (int r = 0; r < rounds; r++) {
+ boolean useGsi = rng.nextDouble() < ((double) NUM_GSI / (NUM_GSI +
NUM_LSI));
+ int idx = useGsi ? rng.nextInt(NUM_GSI) : rng.nextInt(NUM_LSI);
+ String indexName = useGsi ? gsiName(idx) : lsiName(idx);
+
+ // Find any item with the index hash-key set so the query has a
target.
+ // Probed once per round and reused across assertParityEventually
retries —
+ // safe today because randomCrudPass has already finished. If
anyone adds
+ // writes inside the read campaign, refresh the probe inside the
lambda.
+ Map<String, AttributeValue> probe = findItemWithIndex(tableName,
useGsi, idx);
+ if (probe == null) {
+ LOGGER.info("Query round {}: no items present for {};
skipping", r, indexName);
+ continue;
+ }
+
+ String hashAttr = useGsi ? gsiPk(idx) : "pk";
+ Map<String, String> names = new HashMap<>();
+ names.put("#h", hashAttr);
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":h", probe.get(hashAttr));
+
+ QueryRequest.Builder qr = QueryRequest.builder()
+ .tableName(tableName)
+ .indexName(indexName)
+ .keyConditionExpression("#h = :h");
+ if (rng.nextBoolean()) {
+ qr.scanIndexForward(false);
+ }
+ if (rng.nextBoolean() && probe.containsKey("payload")) {
+ // Filter narrows results to the probed item's payload — both
sides
+ // should agree on the (small) result set; this is
filter-codepath coverage.
+ names.put("#p", "payload");
+ values.put(":p", probe.get("payload"));
+ qr.filterExpression("#p = :p");
+ }
+
qr.expressionAttributeNames(names).expressionAttributeValues(values);
+ LOGGER.info("Query round {} index={} hash={}", r, indexName,
probe.get(hashAttr));
+
+ // Wrap the parity comparison in a lambda so
assertParityEventually can
+ // re-invoke it on each retry. Each call rebuilds the same
QueryRequest
+ // and runs it against both clients, throwing AssertionError on
mismatch
+ // — that's the signal assertParityEventually waits on.
+ String label = "Query round " + r + " index=" + indexName;
+ assertParityEventually(label,
+ () -> {
+ // compareQueryOutputs paginates by mutating
exclusiveStartKey on the
+ // builder; on retry we must reset it so we don't
resume from the prior
+ // run's last page.
+ qr.exclusiveStartKey(null);
+ TestUtils.compareQueryOutputs(qr, phoenixDBClientV2,
dynamoDbClient);
+ });
+ }
+ }
+
+ /**
+ * For each round, picks a random index and runs a paginated Scan with a
small
+ * {@code Limit} to force multiple round-trips, asserting Phoenix and
LocalDDB
+ * return the same set of items. Underlying compare uses sorted equality so
+ * non-deterministic Scan ordering across the two implementations doesn't
+ * cause spurious failures.
+ */
+ private void runScanCampaign(String tableName, int rounds, Random rng) {
+ for (int r = 0; r < rounds; r++) {
+ boolean useGsi = rng.nextDouble() < ((double) NUM_GSI / (NUM_GSI +
NUM_LSI));
+ int idx = useGsi ? rng.nextInt(NUM_GSI) : rng.nextInt(NUM_LSI);
+ String indexName = useGsi ? gsiName(idx) : lsiName(idx);
+
+ ScanRequest.Builder sr = ScanRequest.builder()
+ .tableName(tableName)
+ .indexName(indexName)
+ .limit(50 + rng.nextInt(450));
+
+ String hashAttr = useGsi ? gsiPk(idx) : "pk";
+ String sortAttr = useGsi ? gsiSk(idx) : lsiSk(idx);
+ ScalarAttributeType hashType = ScalarAttributeType.S;
+ ScalarAttributeType sortType;
+ if (useGsi) {
+ sortType = (idx % 2 == 0) ? ScalarAttributeType.S :
ScalarAttributeType.N;
+ } else {
+ sortType = (idx % 2 == 0) ? ScalarAttributeType.N :
ScalarAttributeType.S;
+ }
+ LOGGER.info("Scan round {} index={} limit={}", r, indexName,
sr.build().limit());
+
+ // Wrap the parity comparison in a lambda so
assertParityEventually can
+ // re-invoke it on each retry. Each call rebuilds the same
ScanRequest
+ // and runs it against both clients, throwing AssertionError on
mismatch
+ // — that's the signal assertParityEventually waits on.
+ // finalSortType: lambdas can only capture effectively-final
locals, but
+ // sortType is assigned in an if/else above, so we copy it into a
final.
+ ScalarAttributeType finalSortType = sortType;
+ String label = "Scan round " + r + " index=" + indexName;
+ assertParityEventually(label,
+ () -> {
+ // compareScanOutputs paginates by mutating
exclusiveStartKey on the
+ // builder; on retry we must reset it so we don't
resume from the prior
+ // run's last page.
+ sr.exclusiveStartKey(null);
+ TestUtils.compareScanOutputs(sr, phoenixDBClientV2,
dynamoDbClient,
+ hashAttr, sortAttr, hashType, finalSortType);
+ });
+ }
+ }
+
+ /**
+ * Runs a parity comparison against eventually-consistent indexes, retrying
+ * with linear backoff if the two sides briefly disagree.
+ *
+ * Sleeps between attempts: 5s, 10s, 15s, 20s, 25s — total ~75s of grace
+ * before declaring failure across {@code maxAttempts} = 6.
+ *
+ * Catches both {@link AssertionError} (parity mismatch) and {@link
+ * RuntimeException} (transient SDK / HBase failures during pagination) so
+ * a flake on either side gets the same backoff window instead of failing
+ * the round on the first throw.
+ *
+ * @param label human-readable identifier for logs and the final
error
+ * @param parityCheck a JUnit assertion that throws on mismatch
+ */
+ private void assertParityEventually(String label, Runnable parityCheck) {
+ final int maxAttempts = 6;
+ final long backoffStepMs = 5000;
+ Throwable lastFailure = null;
+ for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+ try {
+ parityCheck.run();
+ if (attempt > 1) {
+ LOGGER.info("{} converged on attempt {}", label, attempt);
+ }
+ return;
+ } catch (AssertionError | RuntimeException e) {
+ lastFailure = e;
+ LOGGER.warn("{} failed on attempt {}/{}: {}: {}",
+ label, attempt, maxAttempts,
e.getClass().getSimpleName(), e.getMessage());
+ if (attempt == maxAttempts) break;
+ try {
+ Thread.sleep(backoffStepMs * attempt);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new AssertionError("Interrupted during retry of " +
label, ie);
+ }
+ }
+ }
+ throw new AssertionError("Parity failure on " + label, lastFailure);
+ }
+
+ // ---------- helpers
------------------------------------------------------
+
+ /**
+ * Returns one base-table item that has the chosen index's hash-key
attribute
+ * present, so the Query round can build a {@code KeyConditionExpression}
that
+ * actually targets a row. Probes via LocalDDB (not Phoenix) on purpose:
any
+ * difference in what Phoenix sees for that key is what the parity check is
+ * meant to surface. Scans up to 500 items so sparse GSIs (~10% omission
rate)
+ * still find a hit; returns null only if every probed item lacks the attr.
+ */
+ private Map<String, AttributeValue> findItemWithIndex(String tableName,
boolean useGsi,
+ int idx) {
+ ScanRequest scan =
ScanRequest.builder().tableName(tableName).limit(500).build();
+ List<Map<String, AttributeValue>> items =
dynamoDbClient.scan(scan).items();
+ String hashAttr = useGsi ? gsiPk(idx) : "pk";
+ for (Map<String, AttributeValue> item : items) {
+ if (item.containsKey(hashAttr)) {
+ return item;
+ }
+ }
+ return null;
+ }
+
+ private void executeBatch(String tableName, List<WriteRequest> batch) {
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+ requestItems.put(tableName, new ArrayList<>(batch));
+ BatchWriteItemRequest req =
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+ // Fail fast on partial acceptance: if either side leaves
UnprocessedItems,
+ // liveKeys would silently drift ahead of actual storage and later
parity
+ // failures would be hard to attribute back to seed-time write loss.
+ BatchWriteItemResponse phoenixResp =
phoenixDBClientV2.batchWriteItem(req);
+ Assert.assertTrue("Phoenix returned UnprocessedItems: " +
phoenixResp.unprocessedItems(),
+ phoenixResp.unprocessedItems().isEmpty());
+ BatchWriteItemResponse ddbResp = dynamoDbClient.batchWriteItem(req);
+ Assert.assertTrue("LocalDDB returned UnprocessedItems: " +
ddbResp.unprocessedItems(),
+ ddbResp.unprocessedItems().isEmpty());
+ }
+
+ /**
+ * Constructs a randomized item populated for all 25 indexes. ~10% of GSI
key
+ * attributes are randomly omitted to exercise sparse-GSI behavior. LSI
sort
+ * keys are always populated because DynamoDB rejects items that omit them.
+ */
+ private Map<String, AttributeValue> buildItem(KeyPair key, Random rng) {
+ Map<String, AttributeValue> item = new HashMap<>();
+ item.put("pk", AttributeValue.builder().s(key.pk).build());
+ item.put("sk",
AttributeValue.builder().n(Long.toString(key.sk)).build());
+ item.put("payload", AttributeValue.builder().s(randStr(rng, "p",
12)).build());
+
+ for (int i = 0; i < NUM_GSI; i++) {
+ if (rng.nextInt(10) == 0) continue;
+ item.put(gsiPk(i), AttributeValue.builder().s(randStr(rng, "g" +
i, 6)).build());
+ if (i % 2 == 0) {
+ item.put(gsiSk(i), AttributeValue.builder().s(randStr(rng,
"s", 4)).build());
+ } else {
+ item.put(gsiSk(i),
+
AttributeValue.builder().n(Integer.toString(rng.nextInt(10000))).build());
+ }
+ }
+ for (int i = 0; i < NUM_LSI; i++) {
+ if (i % 2 == 0) {
+ item.put(lsiSk(i),
+
AttributeValue.builder().n(Integer.toString(rng.nextInt(10000))).build());
+ } else {
+ item.put(lsiSk(i), AttributeValue.builder().s(randStr(rng,
"l", 4)).build());
+ }
+ }
+ return item;
+ }
+
+ private KeyPair newUniqueKey(Random rng, Set<KeyPair> existing) {
+ while (true) {
+ KeyPair k = new KeyPair("pk_" + randStr(rng, "", 8),
rng.nextInt(Integer.MAX_VALUE));
+ if (!existing.contains(k)) return k;
+ }
+ }
+
+ private static <T> T sample(Set<T> set, Random rng) {
+ int target = rng.nextInt(set.size());
+ int i = 0;
+ for (T t : set) {
+ if (i++ == target) return t;
+ }
+ throw new IllegalStateException();
+ }
+
+ private static String randStr(Random rng, String prefix, int len) {
+ StringBuilder sb = new StringBuilder(prefix);
+ for (int i = 0; i < len; i++) {
+ sb.append((char) ('a' + rng.nextInt(26)));
+ }
+ return sb.toString();
+ }
+
+ private static String gsiName(int i) { return String.format(Locale.ROOT,
"gsi_%02d", i); }
+ private static String lsiName(int i) { return String.format(Locale.ROOT,
"lsi_%02d", i); }
+ private static String gsiPk(int i) { return String.format(Locale.ROOT,
"gsi_pk_%02d", i); }
+ private static String gsiSk(int i) { return String.format(Locale.ROOT,
"gsi_sk_%02d", i); }
+ private static String lsiSk(int i) { return String.format(Locale.ROOT,
"lsi_sk_%02d", i); }
+
+ private static final class KeyPair {
+ final String pk;
+ final long sk;
+
+ KeyPair(String pk, long sk) {
+ this.pk = pk;
+ this.sk = sk;
+ }
+
+ Map<String, AttributeValue> toKeyMap() {
+ Map<String, AttributeValue> m = new HashMap<>();
+ m.put("pk", AttributeValue.builder().s(pk).build());
+ m.put("sk", AttributeValue.builder().n(Long.toString(sk)).build());
+ return m;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof KeyPair)) return false;
+ KeyPair k = (KeyPair) o;
+ return sk == k.sk && pk.equals(k.pk);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * pk.hashCode() + Long.hashCode(sk);
+ }
+ }
+}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index 7b42ca1..7dbad9a 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -446,6 +446,7 @@ public class TestUtils {
sr.exclusiveStartKey(phoenixResponse.lastEvaluatedKey());
} while (phoenixResponse.hasLastEvaluatedKey());
+ sr.exclusiveStartKey(null);
List<Map<String, AttributeValue>> ddbResult = new ArrayList<>();
ScanResponse ddbResponse;
do {