This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git
The following commit(s) were added to refs/heads/main by this push:
new 31edbe4 PHOENIX-7738 Tests with random filter expressions and segment
scan (#2)
31edbe4 is described below
commit 31edbe4c4a6012d282cfb42d35c2b61e7f38429d
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Dec 2 22:20:51 2025 +0530
PHOENIX-7738 Tests with random filter expressions and segment scan (#2)
---
.asf.yaml | 2 +
.../org/apache/phoenix/ddb/BaseSegmentScanIT.java | 495 +++++++++++++++++++--
.../ddb/ConcurrentDifferentSegmentCountIT.java | 11 +-
.../phoenix/ddb/ConcurrentSegmentScanIT.java | 11 +-
.../java/org/apache/phoenix/ddb/ScanIndex3IT.java | 30 +-
.../org/apache/phoenix/ddb/SegmentScan1IT.java | 57 +++
.../org/apache/phoenix/ddb/SegmentScan2IT.java | 55 +++
.../java/org/apache/phoenix/ddb/SegmentScanIT.java | 129 +++---
8 files changed, 676 insertions(+), 114 deletions(-)
diff --git a/.asf.yaml b/.asf.yaml
index 672a419..b35d86c 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -40,6 +40,8 @@ github:
squash: true
merge: false
rebase: false
+ autolink_jira:
+ - PHOENIX
notifications:
commits: [email protected]
issues: [email protected]
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
index 6587f62..7e514fc 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
@@ -34,10 +34,12 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
-import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -129,7 +131,7 @@ public abstract class BaseSegmentScanIT {
}
/**
- * Create a test item based on the configured key types.
+ * Create a test item based on the configured key types with rich
attributes for filter testing.
*/
protected Map<String, AttributeValue> createTestItem(int index, String
hashKeyName, ScalarAttributeType hashKeyType,
String sortKeyName,
ScalarAttributeType sortKeyType) {
@@ -170,8 +172,54 @@ public abstract class BaseSegmentScanIT {
}
item.put("data", AttributeValue.builder().s("test_data_" +
index).build());
+ item.put("status",
AttributeValue.builder().s(getStatus(index)).build());
+ item.put("priority", AttributeValue.builder().n(String.valueOf(index %
10)).build());
+ item.put("active", AttributeValue.builder().bool(index % 2 ==
0).build());
+ item.put("amount", AttributeValue.builder().n(String.valueOf((index %
1000) + 1)).build());
+ item.put("category",
AttributeValue.builder().s(getCategory(index)).build());
+ item.put("description", AttributeValue.builder().s("description_" +
index).build());
+ item.put("timestamp",
AttributeValue.builder().n(String.valueOf(1000000 + index)).build());
+
+ List<String> tags = new ArrayList<>();
+ tags.add("tag_" + (index % 5));
+ tags.add("common_tag");
+ if (index % 3 == 0) {
+ tags.add("special_tag");
+ }
+ item.put("tags", AttributeValue.builder().ss(tags).build());
+
+ List<String> scores = new ArrayList<>();
+ scores.add(String.valueOf(index % 100));
+ scores.add(String.valueOf((index % 50) + 100));
+ item.put("scores", AttributeValue.builder().ns(scores).build());
+
+ Map<String, AttributeValue> metadata = new HashMap<>();
+ metadata.put("version", AttributeValue.builder().s("v" + (index %
3)).build());
+ metadata.put("count", AttributeValue.builder().n(String.valueOf(index
% 20)).build());
+ metadata.put("enabled", AttributeValue.builder().bool(index % 4 ==
0).build());
+ item.put("metadata", AttributeValue.builder().m(metadata).build());
+
+ List<AttributeValue> items = new ArrayList<>();
+ items.add(AttributeValue.builder().s("item_" + (index % 7)).build());
+ items.add(AttributeValue.builder().n(String.valueOf(index %
30)).build());
+ item.put("items", AttributeValue.builder().l(items).build());
+
+ if (index % 5 == 0) {
+ item.put("optional_field", AttributeValue.builder().s("optional_"
+ index).build());
+ }
+
return item;
}
+
+ private String getStatus(int index) {
+ String[] statuses = {"pending", "active", "completed", "failed",
"cancelled"};
+ return statuses[index % statuses.length];
+ }
+
+ private String getCategory(int index) {
+ String[] categories = {"electronics", "books", "clothing", "food",
"toys", "sports"};
+ return categories[index % categories.length];
+ }
/**
* Split the HBase table at a calculated point.
@@ -206,17 +254,31 @@ public abstract class BaseSegmentScanIT {
* Perform a full scan with pagination using the specified limit.
*/
protected List<Map<String, AttributeValue>>
performFullScanWithPagination(DynamoDbClient client,
- String tableName, boolean useFilter) {
+ String tableName, boolean useFilter, int filterNum) {
+ return performFullScanWithPagination(client, tableName, useFilter,
filterNum, SCAN_LIMIT);
+ }
+
+ /**
+ * Perform a full scan with pagination using a custom scan limit.
+ */
+ protected List<Map<String, AttributeValue>>
performFullScanWithPagination(DynamoDbClient client,
+ String tableName, boolean useFilter, int filterNum, int scanLimit)
{
List<Map<String, AttributeValue>> allItems = new ArrayList<>();
Map<String, AttributeValue> lastEvaluatedKey = null;
do {
ScanRequest.Builder scanBuilder = ScanRequest.builder()
.tableName(tableName)
- .limit(SCAN_LIMIT);
+ .limit(scanLimit);
if (useFilter) {
- scanBuilder.filterExpression(generateFilterExpression())
-
.expressionAttributeNames(getFilterExpressionAttributeNames())
-
.expressionAttributeValues(getFilterExpressionAttributeValues());
+ scanBuilder.filterExpression(getFilterExpression(filterNum));
+ Map<String, String> attrNames =
getFilterAttributeNames(filterNum);
+ if (!attrNames.isEmpty()) {
+ scanBuilder.expressionAttributeNames(attrNames);
+ }
+ Map<String, AttributeValue> attrValues =
getFilterAttributeValues(filterNum);
+ if (!attrValues.isEmpty()) {
+ scanBuilder.expressionAttributeValues(attrValues);
+ }
}
if (lastEvaluatedKey != null) {
scanBuilder.exclusiveStartKey(lastEvaluatedKey);
@@ -232,20 +294,23 @@ public abstract class BaseSegmentScanIT {
* Insert items with periodic table splitting to test segment scan behavior
* across different HBase regions.
*/
- protected void insertItemsWithSplitting(String tableName, String
hashKeyName, ScalarAttributeType hashKeyType,
- String sortKeyName,
ScalarAttributeType sortKeyType,
- int totalItems, int splitFrequency,
int sleepMillis) throws Exception {
+ protected void insertItemsWithSplitting(String tableName, String
hashKeyName,
+ ScalarAttributeType hashKeyType, String sortKeyName,
ScalarAttributeType sortKeyType,
+ int totalItems, int splitFrequency, int sleepMillis) throws
Exception {
+ List<WriteRequest> batch = new ArrayList<>();
for (int i = 0; i < totalItems; i++) {
- Map<String, AttributeValue> item = createTestItem(i, hashKeyName,
hashKeyType, sortKeyName, sortKeyType);
-
- PutItemRequest putRequest = PutItemRequest.builder()
- .tableName(tableName)
- .item(item)
- .build();
-
- phoenixDBClientV2.putItem(putRequest);
- dynamoDbClient.putItem(putRequest);
-
+ Map<String, AttributeValue> item =
+ createTestItem(i, hashKeyName, hashKeyType, sortKeyName,
sortKeyType);
+ WriteRequest writeRequest =
+
WriteRequest.builder().putRequest(PutRequest.builder().item(item).build())
+ .build();
+ batch.add(writeRequest);
+ boolean shouldFlush = batch.size() >= 25 || (i > 0 && (i + 1) %
splitFrequency == 0)
+ || i == totalItems - 1;
+ if (shouldFlush) {
+ executeBatchWrite(tableName, batch);
+ batch.clear();
+ }
// Periodically split the table to create multiple regions
if (i > 0 && i % splitFrequency == 0) {
splitTable(tableName, hashKeyType, i);
@@ -254,6 +319,19 @@ public abstract class BaseSegmentScanIT {
}
}
+ /**
+ * Execute batch write for both Phoenix and DynamoDB clients.
+ */
+ private void executeBatchWrite(String tableName, List<WriteRequest> batch)
{
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+ requestItems.put(tableName, new ArrayList<>(batch));
+ BatchWriteItemRequest batchRequest =
+
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+
+ phoenixDBClientV2.batchWriteItem(batchRequest);
+ dynamoDbClient.batchWriteItem(batchRequest);
+ }
+
/**
* Scan a single segment with pagination.
*/
@@ -262,7 +340,8 @@ public abstract class BaseSegmentScanIT {
int totalSegments,
int scanLimit,
boolean addDelay,
-
boolean useFilter) {
+
boolean useFilter,
+
int filterNum) {
List<Map<String, AttributeValue>> segmentItems = new ArrayList<>();
Map<String, AttributeValue> lastEvaluatedKey = null;
@@ -273,9 +352,15 @@ public abstract class BaseSegmentScanIT {
.totalSegments(totalSegments)
.limit(scanLimit);
if (useFilter) {
- scanBuilder.filterExpression(generateFilterExpression())
-
.expressionAttributeNames(getFilterExpressionAttributeNames())
-
.expressionAttributeValues(getFilterExpressionAttributeValues());
+ scanBuilder.filterExpression(getFilterExpression(filterNum));
+ Map<String, String> attrNames =
getFilterAttributeNames(filterNum);
+ if (!attrNames.isEmpty()) {
+ scanBuilder.expressionAttributeNames(attrNames);
+ }
+ Map<String, AttributeValue> attrValues =
getFilterAttributeValues(filterNum);
+ if (!attrValues.isEmpty()) {
+ scanBuilder.expressionAttributeValues(attrValues);
+ }
}
if (lastEvaluatedKey != null) {
scanBuilder.exclusiveStartKey(lastEvaluatedKey);
@@ -299,19 +384,377 @@ public abstract class BaseSegmentScanIT {
return segmentItems;
}
+ protected String getFilterExpression(int filterNum) {
+ switch (filterNum) {
+ case 1:
+ return getFilterExpression1();
+ case 2:
+ return getFilterExpression2();
+ case 3:
+ return getFilterExpression3();
+ case 4:
+ return getFilterExpression4();
+ case 5:
+ return getFilterExpression5();
+ case 6:
+ return getFilterExpression6();
+ case 7:
+ return getFilterExpression7();
+ case 8:
+ return getFilterExpression8();
+ case 9:
+ return getFilterExpression9();
+ case 10:
+ return getFilterExpression10();
+ case 11:
+ return getFilterExpression11();
+ case 12:
+ return getFilterExpression12();
+ case 13:
+ return getFilterExpression13();
+ case 14:
+ return getFilterExpression14();
+ case 15:
+ return getFilterExpression15();
+ default:
+ return getFilterExpression1();
+ }
+ }
+
+ protected Map<String, String> getFilterAttributeNames(int filterNum) {
+ switch (filterNum) {
+ case 1:
+ return getFilterAttributeNames1();
+ case 2:
+ return getFilterAttributeNames2();
+ case 3:
+ return getFilterAttributeNames3();
+ case 4:
+ return getFilterAttributeNames4();
+ case 5:
+ return getFilterAttributeNames5();
+ case 6:
+ return getFilterAttributeNames6();
+ case 7:
+ return getFilterAttributeNames7();
+ case 8:
+ return getFilterAttributeNames8();
+ case 9:
+ return getFilterAttributeNames9();
+ case 10:
+ return getFilterAttributeNames10();
+ case 11:
+ return getFilterAttributeNames11();
+ case 12:
+ return getFilterAttributeNames12();
+ case 13:
+ return getFilterAttributeNames13();
+ case 14:
+ return getFilterAttributeNames14();
+ case 15:
+ return getFilterAttributeNames15();
+ default:
+ return getFilterAttributeNames1();
+ }
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues(int
filterNum) {
+ switch (filterNum) {
+ case 1:
+ return getFilterAttributeValues1();
+ case 2:
+ return getFilterAttributeValues2();
+ case 3:
+ return getFilterAttributeValues3();
+ case 4:
+ return getFilterAttributeValues4();
+ case 5:
+ return getFilterAttributeValues5();
+ case 6:
+ return getFilterAttributeValues6();
+ case 7:
+ return getFilterAttributeValues7();
+ case 8:
+ return getFilterAttributeValues8();
+ case 9:
+ return getFilterAttributeValues9();
+ case 10:
+ return getFilterAttributeValues10();
+ case 11:
+ return getFilterAttributeValues11();
+ case 12:
+ return getFilterAttributeValues12();
+ case 13:
+ return getFilterAttributeValues13();
+ case 14:
+ return getFilterAttributeValues14();
+ case 15:
+ return getFilterAttributeValues15();
+ default:
+ return getFilterAttributeValues1();
+ }
+ }
+
protected String generateFilterExpression() {
- return "begins_with(#data, :prefix)";
+ return getFilterExpression1();
}
protected Map<String, String> getFilterExpressionAttributeNames() {
+ return getFilterAttributeNames1();
+ }
+
+ protected Map<String, AttributeValue> getFilterExpressionAttributeValues()
{
+ return getFilterAttributeValues1();
+ }
+
+ protected String getFilterExpression1() {
+ return "begins_with(#data, :prefix)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames1() {
Map<String, String> expressionAttributeNames = new HashMap<>();
expressionAttributeNames.put("#data", "data");
return expressionAttributeNames;
}
- protected Map<String, AttributeValue> getFilterExpressionAttributeValues()
{
+ protected Map<String, AttributeValue> getFilterAttributeValues1() {
Map<String, AttributeValue> expressionAttributeValues = new
HashMap<>();
expressionAttributeValues.put(":prefix",
AttributeValue.builder().s("test_data_1").build());
return expressionAttributeValues;
}
+
+ protected String getFilterExpression2() {
+ return "#amount BETWEEN :minAmount AND :maxAmount";
+ }
+
+ protected Map<String, String> getFilterAttributeNames2() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#amount", "amount");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues2() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":minAmount", AttributeValue.builder().n("200").build());
+ values.put(":maxAmount", AttributeValue.builder().n("800").build());
+ return values;
+ }
+
+ protected String getFilterExpression3() {
+ return "#active = :isActive AND #priority > :minPriority";
+ }
+
+ protected Map<String, String> getFilterAttributeNames3() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#active", "active");
+ names.put("#priority", "priority");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues3() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":isActive", AttributeValue.builder().bool(true).build());
+ values.put(":minPriority", AttributeValue.builder().n("5").build());
+ return values;
+ }
+
+ protected String getFilterExpression4() {
+ return "#status IN (:status1, :status2, :status3)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames4() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#status", "status");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues4() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":status1", AttributeValue.builder().s("active").build());
+ values.put(":status2", AttributeValue.builder().s("pending").build());
+ values.put(":status3",
AttributeValue.builder().s("completed").build());
+ return values;
+ }
+
+ protected String getFilterExpression5() {
+ return "contains(#tags, :tag)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames5() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#tags", "tags");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues5() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":tag", AttributeValue.builder().s("special_tag").build());
+ return values;
+ }
+
+ protected String getFilterExpression6() {
+ return "#metadata.#version = :version AND #metadata.#enabled =
:enabled";
+ }
+
+ protected Map<String, String> getFilterAttributeNames6() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#metadata", "metadata");
+ names.put("#version", "version");
+ names.put("#enabled", "enabled");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues6() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":version", AttributeValue.builder().s("v1").build());
+ values.put(":enabled", AttributeValue.builder().bool(true).build());
+ return values;
+ }
+
+ protected String getFilterExpression7() {
+ return "attribute_exists(optional_field)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames7() {
+ return new HashMap<>();
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues7() {
+ return new HashMap<>();
+ }
+
+ protected String getFilterExpression8() {
+ return "attribute_not_exists(optional_field)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames8() {
+ return new HashMap<>();
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues8() {
+ return new HashMap<>();
+ }
+
+ protected String getFilterExpression9() {
+ return "(#category = :category AND #amount > :minAmount) OR "
+ + "(#status = :status AND #active = :active)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames9() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#category", "category");
+ names.put("#amount", "amount");
+ names.put("#status", "status");
+ names.put("#active", "active");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues9() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":category",
AttributeValue.builder().s("electronics").build());
+ values.put(":minAmount", AttributeValue.builder().n("500").build());
+ values.put(":status", AttributeValue.builder().s("active").build());
+ values.put(":active", AttributeValue.builder().bool(true).build());
+ return values;
+ }
+
+ protected String getFilterExpression10() {
+ return "NOT (#status = :status)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames10() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#status", "status");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues10() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":status", AttributeValue.builder().s("failed").build());
+ return values;
+ }
+
+ protected String getFilterExpression11() {
+ return "size(#items) = :size";
+ }
+
+ protected Map<String, String> getFilterAttributeNames11() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#items", "items");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues11() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":size", AttributeValue.builder().n("2").build());
+ return values;
+ }
+
+ protected String getFilterExpression12() {
+ return "contains(#description, :substring)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames12() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#description", "description");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues12() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":substring",
AttributeValue.builder().s("description_1").build());
+ return values;
+ }
+
+ protected String getFilterExpression13() {
+ return "contains(#scores, :score)";
+ }
+
+ protected Map<String, String> getFilterAttributeNames13() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#scores", "scores");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues13() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":score", AttributeValue.builder().n("110").build());
+ return values;
+ }
+
+ protected String getFilterExpression14() {
+ return "#priority >= :minPri AND #priority <= :maxPri AND #category <>
:excludeCategory";
+ }
+
+ protected Map<String, String> getFilterAttributeNames14() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#priority", "priority");
+ names.put("#category", "category");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues14() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":minPri", AttributeValue.builder().n("3").build());
+ values.put(":maxPri", AttributeValue.builder().n("7").build());
+ values.put(":excludeCategory",
AttributeValue.builder().s("food").build());
+ return values;
+ }
+
+ protected String getFilterExpression15() {
+ return "#timestamp > :startTime AND #timestamp < :endTime";
+ }
+
+ protected Map<String, String> getFilterAttributeNames15() {
+ Map<String, String> names = new HashMap<>();
+ names.put("#timestamp", "timestamp");
+ return names;
+ }
+
+ protected Map<String, AttributeValue> getFilterAttributeValues15() {
+ Map<String, AttributeValue> values = new HashMap<>();
+ values.put(":startTime",
AttributeValue.builder().n("1010000").build());
+ values.put(":endTime", AttributeValue.builder().n("1015000").build());
+ return values;
+ }
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
index 852c767..60f3206 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
@@ -69,8 +69,10 @@ public class ConcurrentDifferentSegmentCountIT extends
BaseSegmentScanIT {
TOTAL_ITEMS, SPLIT_FREQUENCY, 75);
// Perform full scan for comparison baseline
- List<Map<String, AttributeValue>> fullScanItemsPhoenix =
performFullScanWithPagination(phoenixDBClientV2, tableName, false);
- List<Map<String, AttributeValue>> fullScanItemsDDB =
performFullScanWithPagination(dynamoDbClient, tableName, false);
+ List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+ performFullScanWithPagination(phoenixDBClientV2, tableName,
false, 0);
+ List<Map<String, AttributeValue>> fullScanItemsDDB =
+ performFullScanWithPagination(dynamoDbClient, tableName,
false, 0);
// Execute concurrent segment scans with different totalSegments
List<Map<String, AttributeValue>> segmentScan3Items =
performConcurrentSegmentScansWithDifferentCounts(tableName);
@@ -141,8 +143,9 @@ public class ConcurrentDifferentSegmentCountIT extends
BaseSegmentScanIT {
// Scan each segment sequentially within this thread
for (int segment = 0; segment < totalSegments; segment++) {
- List<Map<String, AttributeValue>> segmentItems =
- scanSingleSegmentWithPagination(tableName, segment,
totalSegments, SCAN_LIMIT, false, false);
+ List<Map<String, AttributeValue>> segmentItems =
+ scanSingleSegmentWithPagination(tableName, segment,
totalSegments, SCAN_LIMIT,
+ false, false, 0);
allItems.addAll(segmentItems);
LOGGER.info("Segment {}/{} scan found {} items", segment,
totalSegments, segmentItems.size());
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
index bd94d00..d26beab 100644
---
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
+++
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
@@ -70,8 +70,10 @@ public class ConcurrentSegmentScanIT extends
BaseSegmentScanIT {
TOTAL_ITEMS, SPLIT_FREQUENCY, 50);
// Perform full scan for comparison
- List<Map<String, AttributeValue>> fullScanItemsPhoenix =
performFullScanWithPagination(phoenixDBClientV2, tableName, false);
- List<Map<String, AttributeValue>> fullScanItemsDDB =
performFullScanWithPagination(dynamoDbClient, tableName, false);
+ List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+ performFullScanWithPagination(phoenixDBClientV2, tableName,
false, 0);
+ List<Map<String, AttributeValue>> fullScanItemsDDB =
+ performFullScanWithPagination(dynamoDbClient, tableName,
false, 0);
// Execute concurrent segment scans with concurrent splits
List<Map<String, AttributeValue>> concurrentSegmentItems =
@@ -138,8 +140,9 @@ public class ConcurrentSegmentScanIT extends
BaseSegmentScanIT {
}
private List<Map<String, AttributeValue>> scanSegmentWithPagination(String
tableName, int segment) {
- List<Map<String, AttributeValue>> segmentItems =
- scanSingleSegmentWithPagination(tableName, segment,
TOTAL_SEGMENTS, SCAN_LIMIT, true, false);
+ List<Map<String, AttributeValue>> segmentItems =
+ scanSingleSegmentWithPagination(tableName, segment,
TOTAL_SEGMENTS, SCAN_LIMIT,
+ true, false, 0);
LOGGER.info("Segment {} scan completed with {} items", segment,
segmentItems.size());
return segmentItems;
}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
index c66b988..0dd7a78 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
@@ -40,13 +40,15 @@ import org.junit.Test;
import org.junit.rules.TestName;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
-import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
@@ -87,6 +89,18 @@ public class ScanIndex3IT {
createTableAndInsertData();
}
+ private static void executeBatchWrite(String tableName, List<WriteRequest>
batch) {
+ if (batch.isEmpty()) {
+ return;
+ }
+ Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+ requestItems.put(tableName, new ArrayList<>(batch));
+ BatchWriteItemRequest batchRequest =
+
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+ phoenixDBClientV2.batchWriteItem(batchRequest);
+ dynamoDbClient.batchWriteItem(batchRequest);
+ }
+
private static void createTableAndInsertData() {
CreateTableRequest createTableRequest =
DDLTestUtils.getCreateTableRequest(
TABLE_NAME, "pk", ScalarAttributeType.S, "sk",
ScalarAttributeType.N);
@@ -96,6 +110,7 @@ public class ScanIndex3IT {
phoenixDBClientV2.createTable(createTableRequest);
dynamoDbClient.createTable(createTableRequest);
+ List<WriteRequest> batch = new ArrayList<>();
for (int i = 0; i < NUM_RECORDS; i++) {
Map<String, AttributeValue> item = new HashMap<>();
item.put("pk", AttributeValue.builder().s("pk_" + (i %
100)).build());
@@ -126,10 +141,15 @@ public class ScanIndex3IT {
nestedList.add(AttributeValue.builder().n(String.valueOf(i %
40)).build());
nestedList.add(AttributeValue.builder().ss("set_in_list_" + (i %
3), "list_common").build());
item.put("itms", AttributeValue.builder().l(nestedList).build());
-
- PutItemRequest putRequest =
PutItemRequest.builder().tableName(TABLE_NAME).item(item).build();
- phoenixDBClientV2.putItem(putRequest);
- dynamoDbClient.putItem(putRequest);
+
+ WriteRequest writeRequest =
+
WriteRequest.builder().putRequest(PutRequest.builder().item(item).build())
+ .build();
+ batch.add(writeRequest);
+ if (batch.size() >= 25 || i == NUM_RECORDS - 1) {
+ executeBatchWrite(TABLE_NAME, batch);
+ batch.clear();
+ }
}
for (int i = 0; i < NUM_RECORDS; i++) {
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java
new file mode 100644
index 0000000..70fb38a
--- /dev/null
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+
+/**
+ * Segment scan tests - Part 1.
+ */
+@RunWith(Parameterized.class)
+public class SegmentScan1IT extends SegmentScanIT {
+
+ public SegmentScan1IT(String hashKeyName, ScalarAttributeType hashKeyType,
String sortKeyName,
+ ScalarAttributeType sortKeyType, boolean useFilter) {
+ super(hashKeyName, hashKeyType, sortKeyName, sortKeyType, useFilter);
+ }
+
+ @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Filter_{4}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ // Hash key only
+ {"pk", ScalarAttributeType.S, null, null, true},
+ {"pk", ScalarAttributeType.N, null, null, false},
+ {"pk", ScalarAttributeType.B, null, null, true},
+
+ // String Hash Key combinations
+ {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.S,
false},
+ {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N,
true},
+ {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.B,
false},
+
+ // Number Hash Key combinations
+ {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S,
true},
+ {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.N,
false},
+ {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B,
true}
+ });
+ }
+}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java
new file mode 100644
index 0000000..28e922c
--- /dev/null
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+
+/**
+ * Segment scan tests - Part 2.
+ */
+@RunWith(Parameterized.class)
+public class SegmentScan2IT extends SegmentScanIT {
+
+ public SegmentScan2IT(String hashKeyName, ScalarAttributeType hashKeyType,
String sortKeyName,
+ ScalarAttributeType sortKeyType, boolean useFilter) {
+ super(hashKeyName, hashKeyType, sortKeyName, sortKeyType, useFilter);
+ }
+
+ @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Filter_{4}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ // Binary Hash Key combinations
+ {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S,
false},
+ {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.N,
true},
+ {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B,
false},
+
+ // Additional combinations
+ {"pk", ScalarAttributeType.S, null, null, true},
+ {"pk", ScalarAttributeType.N, null, null, false},
+ {"pk", ScalarAttributeType.B, null, null, true},
+ {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S,
false},
+ {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N,
true},
+ {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B,
false}
+ });
+ }
+}
diff --git
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
index f325de7..8a168c3 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
@@ -19,91 +19,48 @@ package org.apache.phoenix.ddb;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
/**
- * Comprehensive parametrized test for segment scan functionality.
- *
- * This test verifies that segment scans work correctly across different key
configurations
- * ,segment counts, limits and filters. For each test run:
- * 1. Creates a table with specified key schema
- * 2. Inserts items while periodically splitting the underlying HBase table
- * 3. Performs both full scan and segment scan with pagination, limits and
filters
- * 4. Verifies that both approaches return identical results to ddb
+ * Abstract base class for segment scan tests.
*/
-@RunWith(Parameterized.class)
-public class SegmentScanIT extends BaseSegmentScanIT {
-
- private final String hashKeyName;
- private final ScalarAttributeType hashKeyType;
- private final String sortKeyName;
- private final ScalarAttributeType sortKeyType;
- private final int totalSegments;
- private final int scanLimit;
- private final boolean useFilter;
- protected static final int TOTAL_ITEMS = 2000;
- protected static final int SPLIT_FREQUENCY = 500;
+public abstract class SegmentScanIT extends BaseSegmentScanIT {
+
+ protected final String hashKeyName;
+ protected final ScalarAttributeType hashKeyType;
+ protected final String sortKeyName;
+ protected final ScalarAttributeType sortKeyType;
+ protected final int totalSegments;
+ protected final boolean useFilter;
+ protected final int filterNum;
+ protected static final int TOTAL_ITEMS = 25000;
+ protected static final int SPLIT_FREQUENCY = 4000;
public SegmentScanIT(String hashKeyName, ScalarAttributeType hashKeyType,
String sortKeyName, ScalarAttributeType sortKeyType,
- int totalSegments, int scanLimit, boolean useFilter) {
+ boolean useFilter) {
this.hashKeyName = hashKeyName;
this.hashKeyType = hashKeyType;
this.sortKeyName = sortKeyName;
this.sortKeyType = sortKeyType;
- this.totalSegments = totalSegments;
- this.scanLimit = scanLimit;
+ Random random = new Random();
+ this.totalSegments = random.nextInt(8) + 1;
this.useFilter = useFilter;
+ this.filterNum = useFilter ? random.nextInt(15) + 1 : 0;
}
- @Parameterized.Parameters(name =
"Hash_{1}_Sort_{3}_Segments_{4}_Limit_{5}_Filter_{6}")
- public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] {
- // Hash key only
- {"pk", ScalarAttributeType.S, null, null, 3, 34, true},
- {"pk", ScalarAttributeType.N, null, null, 4, 52, false},
- {"pk", ScalarAttributeType.B, null, null, 2, 41, true},
-
- // String Hash Key combinations
- {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.S, 1, 28,
false},
- {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 2, 59,
true},
- {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.B, 4, 45,
false},
-
- // Number Hash Key combinations
- {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S, 1, 23,
true},
- {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.N, 2, 37,
false},
- {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 3, 51,
true},
-
- // Binary Hash Key combinations
- {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 2, 29,
false},
- {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.N, 4, 46,
true},
- {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B, 3, 33,
false},
-
- // Total Segments > #regions
- {"pk", ScalarAttributeType.S, null, null, 5, 58, true},
- {"pk", ScalarAttributeType.N, null, null, 6, 25, false},
- {"pk", ScalarAttributeType.B, null, null, 7, 42, true},
- {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 5, 31,
false},
- {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 6, 55,
true},
- {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 7, 38,
false},
- });
- }
-
-
- @Test(timeout = 300000)
+ @Test(timeout = 600000)
public void testSegmentScan() throws Exception {
- final String tableName =
testName.getMethodName().replaceAll("[\\[\\]]", "_");
+ final String tableName =
testName.getMethodName().replaceAll("[\\[\\]]", "_")
+ + "_" + generateRandomString(7);
// Create table
CreateTableRequest createTableRequest =
DDLTestUtils.getCreateTableRequest(
@@ -112,33 +69,45 @@ public class SegmentScanIT extends BaseSegmentScanIT {
dynamoDbClient.createTable(createTableRequest);
// Insert items with periodic table splitting
- insertItemsWithSplitting(tableName, hashKeyName, hashKeyType,
sortKeyName, sortKeyType,
- TOTAL_ITEMS, SPLIT_FREQUENCY, 100);
+ insertItemsWithSplitting(tableName, hashKeyName, hashKeyType,
sortKeyName, sortKeyType,
+ TOTAL_ITEMS, SPLIT_FREQUENCY, 200);
+
+ Random random = new Random();
// Perform full scan with pagination
- List<Map<String, AttributeValue>> fullScanItemsPhoenix =
performFullScanWithPagination(phoenixDBClientV2, tableName, useFilter);
- List<Map<String, AttributeValue>> fullScanItemsDDB =
performFullScanWithPagination(dynamoDbClient, tableName, useFilter);
+ List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+ performFullScanWithPagination(phoenixDBClientV2, tableName,
useFilter, filterNum,
+ random.nextInt(150) + 1);
+ List<Map<String, AttributeValue>> fullScanItemsDDB =
+ performFullScanWithPagination(dynamoDbClient, tableName,
useFilter, filterNum,
+ random.nextInt(200) + 1);
// Perform segment scan serially on all segments with pagination
- List<Map<String, AttributeValue>> segmentScanItems =
performSegmentScanWithPagination(tableName, useFilter);
+ List<Map<String, AttributeValue>> segmentScanItems =
+ performSegmentScanWithPagination(tableName, useFilter,
filterNum);
// Verify results
- TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix,
hashKeyName, sortKeyName);
- TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems,
hashKeyName, sortKeyName);
+ TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix,
hashKeyName,
+ sortKeyName);
+ TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems,
hashKeyName,
+ sortKeyName);
}
/**
* Perform segment scan across all segments with pagination.
*/
- private List<Map<String, AttributeValue>>
performSegmentScanWithPagination(String tableName, boolean useFilter)
- throws SQLException {
+ private List<Map<String, AttributeValue>>
performSegmentScanWithPagination(String tableName,
+ boolean useFilter, int filterNum) throws SQLException {
List<Map<String, AttributeValue>> allItems = new ArrayList<>();
+ Random random = new Random();
// Scan each segment sequentially
int numRegions = TestUtils.getNumberOfTableRegions(testConnection,
tableName);
for (int segment = 0; segment < totalSegments; segment++) {
- List<Map<String, AttributeValue>> segmentItems =
- scanSingleSegmentWithPagination(tableName, segment,
totalSegments, scanLimit, false, useFilter);
+ int scanLimit = random.nextInt(200) + 1;
+ List<Map<String, AttributeValue>> segmentItems =
+ scanSingleSegmentWithPagination(tableName, segment,
totalSegments, scanLimit,
+ false, useFilter, filterNum);
allItems.addAll(segmentItems);
if (segment < numRegions && !useFilter) {
Assert.assertFalse(segmentItems.isEmpty());
@@ -146,4 +115,14 @@ public class SegmentScanIT extends BaseSegmentScanIT {
}
return allItems;
}
-}
\ No newline at end of file
+
+ private String generateRandomString(int length) {
+ String chars = "abcdefghijklmnopqrstuvwxyz0123456789-_.";
+ Random random = new Random();
+ StringBuilder sb = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ sb.append(chars.charAt(random.nextInt(chars.length())));
+ }
+ return sb.toString();
+ }
+}