This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git


The following commit(s) were added to refs/heads/main by this push:
     new 31edbe4  PHOENIX-7738 Tests with random filter expressions and segment 
scan (#2)
31edbe4 is described below

commit 31edbe4c4a6012d282cfb42d35c2b61e7f38429d
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Dec 2 22:20:51 2025 +0530

    PHOENIX-7738 Tests with random filter expressions and segment scan (#2)
---
 .asf.yaml                                          |   2 +
 .../org/apache/phoenix/ddb/BaseSegmentScanIT.java  | 495 +++++++++++++++++++--
 .../ddb/ConcurrentDifferentSegmentCountIT.java     |  11 +-
 .../phoenix/ddb/ConcurrentSegmentScanIT.java       |  11 +-
 .../java/org/apache/phoenix/ddb/ScanIndex3IT.java  |  30 +-
 .../org/apache/phoenix/ddb/SegmentScan1IT.java     |  57 +++
 .../org/apache/phoenix/ddb/SegmentScan2IT.java     |  55 +++
 .../java/org/apache/phoenix/ddb/SegmentScanIT.java | 129 +++---
 8 files changed, 676 insertions(+), 114 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 672a419..b35d86c 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -40,6 +40,8 @@ github:
     squash: true
     merge: false
     rebase: false
+  autolink_jira:
+    - PHOENIX
 notifications:
   commits: [email protected]
   issues: [email protected]
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
index 6587f62..7e514fc 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java
@@ -34,10 +34,12 @@ import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
-import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
 import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
 import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -129,7 +131,7 @@ public abstract class BaseSegmentScanIT {
     }
 
     /**
-     * Create a test item based on the configured key types.
+     * Create a test item based on the configured key types with rich 
attributes for filter testing.
      */
     protected Map<String, AttributeValue> createTestItem(int index, String 
hashKeyName, ScalarAttributeType hashKeyType,
                                                        String sortKeyName, 
ScalarAttributeType sortKeyType) {
@@ -170,8 +172,54 @@ public abstract class BaseSegmentScanIT {
         }
         
         item.put("data", AttributeValue.builder().s("test_data_" + 
index).build());
+        item.put("status", 
AttributeValue.builder().s(getStatus(index)).build());
+        item.put("priority", AttributeValue.builder().n(String.valueOf(index % 
10)).build());
+        item.put("active", AttributeValue.builder().bool(index % 2 == 
0).build());
+        item.put("amount", AttributeValue.builder().n(String.valueOf((index % 
1000) + 1)).build());
+        item.put("category", 
AttributeValue.builder().s(getCategory(index)).build());
+        item.put("description", AttributeValue.builder().s("description_" + 
index).build());
+        item.put("timestamp", 
AttributeValue.builder().n(String.valueOf(1000000 + index)).build());
+        
+        List<String> tags = new ArrayList<>();
+        tags.add("tag_" + (index % 5));
+        tags.add("common_tag");
+        if (index % 3 == 0) {
+            tags.add("special_tag");
+        }
+        item.put("tags", AttributeValue.builder().ss(tags).build());
+        
+        List<String> scores = new ArrayList<>();
+        scores.add(String.valueOf(index % 100));
+        scores.add(String.valueOf((index % 50) + 100));
+        item.put("scores", AttributeValue.builder().ns(scores).build());
+        
+        Map<String, AttributeValue> metadata = new HashMap<>();
+        metadata.put("version", AttributeValue.builder().s("v" + (index % 
3)).build());
+        metadata.put("count", AttributeValue.builder().n(String.valueOf(index 
% 20)).build());
+        metadata.put("enabled", AttributeValue.builder().bool(index % 4 == 
0).build());
+        item.put("metadata", AttributeValue.builder().m(metadata).build());
+        
+        List<AttributeValue> items = new ArrayList<>();
+        items.add(AttributeValue.builder().s("item_" + (index % 7)).build());
+        items.add(AttributeValue.builder().n(String.valueOf(index % 
30)).build());
+        item.put("items", AttributeValue.builder().l(items).build());
+        
+        if (index % 5 == 0) {
+            item.put("optional_field", AttributeValue.builder().s("optional_" 
+ index).build());
+        }
+        
         return item;
     }
+    
+    private String getStatus(int index) {
+        String[] statuses = {"pending", "active", "completed", "failed", 
"cancelled"};
+        return statuses[index % statuses.length];
+    }
+    
+    private String getCategory(int index) {
+        String[] categories = {"electronics", "books", "clothing", "food", 
"toys", "sports"};
+        return categories[index % categories.length];
+    }
 
     /**
      * Split the HBase table at a calculated point.
@@ -206,17 +254,31 @@ public abstract class BaseSegmentScanIT {
      * Perform a full scan with pagination using the specified limit.
      */
     protected List<Map<String, AttributeValue>> 
performFullScanWithPagination(DynamoDbClient client,
-            String tableName, boolean useFilter) {
+            String tableName, boolean useFilter, int filterNum) {
+        return performFullScanWithPagination(client, tableName, useFilter, 
filterNum, SCAN_LIMIT);
+    }
+
+    /**
+     * Perform a full scan with pagination using a custom scan limit.
+     */
+    protected List<Map<String, AttributeValue>> 
performFullScanWithPagination(DynamoDbClient client,
+            String tableName, boolean useFilter, int filterNum, int scanLimit) 
{
         List<Map<String, AttributeValue>> allItems = new ArrayList<>();
         Map<String, AttributeValue> lastEvaluatedKey = null;
         do {
             ScanRequest.Builder scanBuilder = ScanRequest.builder()
                     .tableName(tableName)
-                    .limit(SCAN_LIMIT);
+                    .limit(scanLimit);
             if (useFilter) {
-                scanBuilder.filterExpression(generateFilterExpression())
-                           
.expressionAttributeNames(getFilterExpressionAttributeNames())
-                           
.expressionAttributeValues(getFilterExpressionAttributeValues());
+                scanBuilder.filterExpression(getFilterExpression(filterNum));
+                Map<String, String> attrNames = 
getFilterAttributeNames(filterNum);
+                if (!attrNames.isEmpty()) {
+                    scanBuilder.expressionAttributeNames(attrNames);
+                }
+                Map<String, AttributeValue> attrValues = 
getFilterAttributeValues(filterNum);
+                if (!attrValues.isEmpty()) {
+                    scanBuilder.expressionAttributeValues(attrValues);
+                }
             }
             if (lastEvaluatedKey != null) {
                 scanBuilder.exclusiveStartKey(lastEvaluatedKey);
@@ -232,20 +294,23 @@ public abstract class BaseSegmentScanIT {
      * Insert items with periodic table splitting to test segment scan behavior
      * across different HBase regions.
      */
-    protected void insertItemsWithSplitting(String tableName, String 
hashKeyName, ScalarAttributeType hashKeyType,
-                                          String sortKeyName, 
ScalarAttributeType sortKeyType, 
-                                          int totalItems, int splitFrequency, 
int sleepMillis) throws Exception {
+    protected void insertItemsWithSplitting(String tableName, String 
hashKeyName,
+            ScalarAttributeType hashKeyType, String sortKeyName, 
ScalarAttributeType sortKeyType,
+            int totalItems, int splitFrequency, int sleepMillis) throws 
Exception {
+        List<WriteRequest> batch = new ArrayList<>();
         for (int i = 0; i < totalItems; i++) {
-            Map<String, AttributeValue> item = createTestItem(i, hashKeyName, 
hashKeyType, sortKeyName, sortKeyType);
-            
-            PutItemRequest putRequest = PutItemRequest.builder()
-                    .tableName(tableName)
-                    .item(item)
-                    .build();
-            
-            phoenixDBClientV2.putItem(putRequest);
-            dynamoDbClient.putItem(putRequest);
-            
+            Map<String, AttributeValue> item =
+                    createTestItem(i, hashKeyName, hashKeyType, sortKeyName, 
sortKeyType);
+            WriteRequest writeRequest =
+                    
WriteRequest.builder().putRequest(PutRequest.builder().item(item).build())
+                            .build();
+            batch.add(writeRequest);
+            boolean shouldFlush = batch.size() >= 25 || (i > 0 && (i + 1) % 
splitFrequency == 0)
+                    || i == totalItems - 1;
+            if (shouldFlush) {
+                executeBatchWrite(tableName, batch);
+                batch.clear();
+            }
             // Periodically split the table to create multiple regions
             if (i > 0 && i % splitFrequency == 0) {
                 splitTable(tableName, hashKeyType, i);
@@ -254,6 +319,19 @@ public abstract class BaseSegmentScanIT {
         }
     }
 
+    /**
+     * Execute batch write for both Phoenix and DynamoDB clients.
+     */
+    private void executeBatchWrite(String tableName, List<WriteRequest> batch) 
{
+        Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+        requestItems.put(tableName, new ArrayList<>(batch));
+        BatchWriteItemRequest batchRequest =
+                
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+
+        phoenixDBClientV2.batchWriteItem(batchRequest);
+        dynamoDbClient.batchWriteItem(batchRequest);
+    }
+
     /**
      * Scan a single segment with pagination.
      */
@@ -262,7 +340,8 @@ public abstract class BaseSegmentScanIT {
                                                                                
int totalSegments, 
                                                                                
int scanLimit,
                                                                                
boolean addDelay,
-                                                                               
boolean useFilter) {
+                                                                               
boolean useFilter,
+                                                                               
int filterNum) {
         List<Map<String, AttributeValue>> segmentItems = new ArrayList<>();
         Map<String, AttributeValue> lastEvaluatedKey = null;
         
@@ -273,9 +352,15 @@ public abstract class BaseSegmentScanIT {
                     .totalSegments(totalSegments)
                     .limit(scanLimit);
             if (useFilter) {
-                scanBuilder.filterExpression(generateFilterExpression())
-                           
.expressionAttributeNames(getFilterExpressionAttributeNames())
-                           
.expressionAttributeValues(getFilterExpressionAttributeValues());
+                scanBuilder.filterExpression(getFilterExpression(filterNum));
+                Map<String, String> attrNames = 
getFilterAttributeNames(filterNum);
+                if (!attrNames.isEmpty()) {
+                    scanBuilder.expressionAttributeNames(attrNames);
+                }
+                Map<String, AttributeValue> attrValues = 
getFilterAttributeValues(filterNum);
+                if (!attrValues.isEmpty()) {
+                    scanBuilder.expressionAttributeValues(attrValues);
+                }
             }
             if (lastEvaluatedKey != null) {
                 scanBuilder.exclusiveStartKey(lastEvaluatedKey);
@@ -299,19 +384,377 @@ public abstract class BaseSegmentScanIT {
         return segmentItems;
     }
 
+    protected String getFilterExpression(int filterNum) {
+        switch (filterNum) {
+            case 1:
+                return getFilterExpression1();
+            case 2:
+                return getFilterExpression2();
+            case 3:
+                return getFilterExpression3();
+            case 4:
+                return getFilterExpression4();
+            case 5:
+                return getFilterExpression5();
+            case 6:
+                return getFilterExpression6();
+            case 7:
+                return getFilterExpression7();
+            case 8:
+                return getFilterExpression8();
+            case 9:
+                return getFilterExpression9();
+            case 10:
+                return getFilterExpression10();
+            case 11:
+                return getFilterExpression11();
+            case 12:
+                return getFilterExpression12();
+            case 13:
+                return getFilterExpression13();
+            case 14:
+                return getFilterExpression14();
+            case 15:
+                return getFilterExpression15();
+            default:
+                return getFilterExpression1();
+        }
+    }
+
+    protected Map<String, String> getFilterAttributeNames(int filterNum) {
+        switch (filterNum) {
+            case 1:
+                return getFilterAttributeNames1();
+            case 2:
+                return getFilterAttributeNames2();
+            case 3:
+                return getFilterAttributeNames3();
+            case 4:
+                return getFilterAttributeNames4();
+            case 5:
+                return getFilterAttributeNames5();
+            case 6:
+                return getFilterAttributeNames6();
+            case 7:
+                return getFilterAttributeNames7();
+            case 8:
+                return getFilterAttributeNames8();
+            case 9:
+                return getFilterAttributeNames9();
+            case 10:
+                return getFilterAttributeNames10();
+            case 11:
+                return getFilterAttributeNames11();
+            case 12:
+                return getFilterAttributeNames12();
+            case 13:
+                return getFilterAttributeNames13();
+            case 14:
+                return getFilterAttributeNames14();
+            case 15:
+                return getFilterAttributeNames15();
+            default:
+                return getFilterAttributeNames1();
+        }
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues(int 
filterNum) {
+        switch (filterNum) {
+            case 1:
+                return getFilterAttributeValues1();
+            case 2:
+                return getFilterAttributeValues2();
+            case 3:
+                return getFilterAttributeValues3();
+            case 4:
+                return getFilterAttributeValues4();
+            case 5:
+                return getFilterAttributeValues5();
+            case 6:
+                return getFilterAttributeValues6();
+            case 7:
+                return getFilterAttributeValues7();
+            case 8:
+                return getFilterAttributeValues8();
+            case 9:
+                return getFilterAttributeValues9();
+            case 10:
+                return getFilterAttributeValues10();
+            case 11:
+                return getFilterAttributeValues11();
+            case 12:
+                return getFilterAttributeValues12();
+            case 13:
+                return getFilterAttributeValues13();
+            case 14:
+                return getFilterAttributeValues14();
+            case 15:
+                return getFilterAttributeValues15();
+            default:
+                return getFilterAttributeValues1();
+        }
+    }
+
     protected String generateFilterExpression() {
-        return "begins_with(#data, :prefix)";
+        return getFilterExpression1();
     }
 
     protected Map<String, String> getFilterExpressionAttributeNames() {
+        return getFilterAttributeNames1();
+    }
+
+    protected Map<String, AttributeValue> getFilterExpressionAttributeValues() 
{
+        return getFilterAttributeValues1();
+    }
+
+    protected String getFilterExpression1() {
+        return "begins_with(#data, :prefix)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames1() {
         Map<String, String> expressionAttributeNames = new HashMap<>();
         expressionAttributeNames.put("#data", "data");
         return expressionAttributeNames;
     }
 
-    protected Map<String, AttributeValue> getFilterExpressionAttributeValues() 
{
+    protected Map<String, AttributeValue> getFilterAttributeValues1() {
         Map<String, AttributeValue> expressionAttributeValues = new 
HashMap<>();
         expressionAttributeValues.put(":prefix", 
AttributeValue.builder().s("test_data_1").build());
         return expressionAttributeValues;
     }
+
+    protected String getFilterExpression2() {
+        return "#amount BETWEEN :minAmount AND :maxAmount";
+    }
+
+    protected Map<String, String> getFilterAttributeNames2() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#amount", "amount");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues2() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":minAmount", AttributeValue.builder().n("200").build());
+        values.put(":maxAmount", AttributeValue.builder().n("800").build());
+        return values;
+    }
+
+    protected String getFilterExpression3() {
+        return "#active = :isActive AND #priority > :minPriority";
+    }
+
+    protected Map<String, String> getFilterAttributeNames3() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#active", "active");
+        names.put("#priority", "priority");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues3() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":isActive", AttributeValue.builder().bool(true).build());
+        values.put(":minPriority", AttributeValue.builder().n("5").build());
+        return values;
+    }
+
+    protected String getFilterExpression4() {
+        return "#status IN (:status1, :status2, :status3)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames4() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#status", "status");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues4() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":status1", AttributeValue.builder().s("active").build());
+        values.put(":status2", AttributeValue.builder().s("pending").build());
+        values.put(":status3", 
AttributeValue.builder().s("completed").build());
+        return values;
+    }
+
+    protected String getFilterExpression5() {
+        return "contains(#tags, :tag)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames5() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#tags", "tags");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues5() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":tag", AttributeValue.builder().s("special_tag").build());
+        return values;
+    }
+
+    protected String getFilterExpression6() {
+        return "#metadata.#version = :version AND #metadata.#enabled = 
:enabled";
+    }
+
+    protected Map<String, String> getFilterAttributeNames6() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#metadata", "metadata");
+        names.put("#version", "version");
+        names.put("#enabled", "enabled");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues6() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":version", AttributeValue.builder().s("v1").build());
+        values.put(":enabled", AttributeValue.builder().bool(true).build());
+        return values;
+    }
+
+    protected String getFilterExpression7() {
+        return "attribute_exists(optional_field)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames7() {
+        return new HashMap<>();
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues7() {
+        return new HashMap<>();
+    }
+
+    protected String getFilterExpression8() {
+        return "attribute_not_exists(optional_field)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames8() {
+        return new HashMap<>();
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues8() {
+        return new HashMap<>();
+    }
+
+    protected String getFilterExpression9() {
+        return "(#category = :category AND #amount > :minAmount) OR "
+                + "(#status = :status AND #active = :active)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames9() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#category", "category");
+        names.put("#amount", "amount");
+        names.put("#status", "status");
+        names.put("#active", "active");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues9() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":category", 
AttributeValue.builder().s("electronics").build());
+        values.put(":minAmount", AttributeValue.builder().n("500").build());
+        values.put(":status", AttributeValue.builder().s("active").build());
+        values.put(":active", AttributeValue.builder().bool(true).build());
+        return values;
+    }
+
+    protected String getFilterExpression10() {
+        return "NOT (#status = :status)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames10() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#status", "status");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues10() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":status", AttributeValue.builder().s("failed").build());
+        return values;
+    }
+
+    protected String getFilterExpression11() {
+        return "size(#items) = :size";
+    }
+
+    protected Map<String, String> getFilterAttributeNames11() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#items", "items");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues11() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":size", AttributeValue.builder().n("2").build());
+        return values;
+    }
+
+    protected String getFilterExpression12() {
+        return "contains(#description, :substring)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames12() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#description", "description");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues12() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":substring", 
AttributeValue.builder().s("description_1").build());
+        return values;
+    }
+
+    protected String getFilterExpression13() {
+        return "contains(#scores, :score)";
+    }
+
+    protected Map<String, String> getFilterAttributeNames13() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#scores", "scores");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues13() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":score", AttributeValue.builder().n("110").build());
+        return values;
+    }
+
+    protected String getFilterExpression14() {
+        return "#priority >= :minPri AND #priority <= :maxPri AND #category <> 
:excludeCategory";
+    }
+
+    protected Map<String, String> getFilterAttributeNames14() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#priority", "priority");
+        names.put("#category", "category");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues14() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":minPri", AttributeValue.builder().n("3").build());
+        values.put(":maxPri", AttributeValue.builder().n("7").build());
+        values.put(":excludeCategory", 
AttributeValue.builder().s("food").build());
+        return values;
+    }
+
+    protected String getFilterExpression15() {
+        return "#timestamp > :startTime AND #timestamp < :endTime";
+    }
+
+    protected Map<String, String> getFilterAttributeNames15() {
+        Map<String, String> names = new HashMap<>();
+        names.put("#timestamp", "timestamp");
+        return names;
+    }
+
+    protected Map<String, AttributeValue> getFilterAttributeValues15() {
+        Map<String, AttributeValue> values = new HashMap<>();
+        values.put(":startTime", 
AttributeValue.builder().n("1010000").build());
+        values.put(":endTime", AttributeValue.builder().n("1015000").build());
+        return values;
+    }
 }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
index 852c767..60f3206 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java
@@ -69,8 +69,10 @@ public class ConcurrentDifferentSegmentCountIT extends 
BaseSegmentScanIT {
                                 TOTAL_ITEMS, SPLIT_FREQUENCY, 75);
 
         // Perform full scan for comparison baseline
-        List<Map<String, AttributeValue>> fullScanItemsPhoenix = 
performFullScanWithPagination(phoenixDBClientV2, tableName, false);
-        List<Map<String, AttributeValue>> fullScanItemsDDB = 
performFullScanWithPagination(dynamoDbClient, tableName, false);
+        List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+                performFullScanWithPagination(phoenixDBClientV2, tableName, 
false, 0);
+        List<Map<String, AttributeValue>> fullScanItemsDDB =
+                performFullScanWithPagination(dynamoDbClient, tableName, 
false, 0);
 
         // Execute concurrent segment scans with different totalSegments
         List<Map<String, AttributeValue>> segmentScan3Items = 
performConcurrentSegmentScansWithDifferentCounts(tableName);
@@ -141,8 +143,9 @@ public class ConcurrentDifferentSegmentCountIT extends 
BaseSegmentScanIT {
         
         // Scan each segment sequentially within this thread
         for (int segment = 0; segment < totalSegments; segment++) {
-            List<Map<String, AttributeValue>> segmentItems = 
-                scanSingleSegmentWithPagination(tableName, segment, 
totalSegments, SCAN_LIMIT, false, false);
+            List<Map<String, AttributeValue>> segmentItems =
+                    scanSingleSegmentWithPagination(tableName, segment, 
totalSegments, SCAN_LIMIT,
+                            false, false, 0);
             allItems.addAll(segmentItems);
             LOGGER.info("Segment {}/{} scan found {} items", segment, 
totalSegments, segmentItems.size());
         }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
index bd94d00..d26beab 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java
@@ -70,8 +70,10 @@ public class ConcurrentSegmentScanIT extends 
BaseSegmentScanIT {
                                 TOTAL_ITEMS, SPLIT_FREQUENCY, 50);
 
         // Perform full scan for comparison
-        List<Map<String, AttributeValue>> fullScanItemsPhoenix = 
performFullScanWithPagination(phoenixDBClientV2, tableName, false);
-        List<Map<String, AttributeValue>> fullScanItemsDDB = 
performFullScanWithPagination(dynamoDbClient, tableName, false);
+        List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+                performFullScanWithPagination(phoenixDBClientV2, tableName, 
false, 0);
+        List<Map<String, AttributeValue>> fullScanItemsDDB =
+                performFullScanWithPagination(dynamoDbClient, tableName, 
false, 0);
 
         // Execute concurrent segment scans with concurrent splits
         List<Map<String, AttributeValue>> concurrentSegmentItems = 
@@ -138,8 +140,9 @@ public class ConcurrentSegmentScanIT extends 
BaseSegmentScanIT {
     }
 
     private List<Map<String, AttributeValue>> scanSegmentWithPagination(String 
tableName, int segment) {
-        List<Map<String, AttributeValue>> segmentItems = 
-            scanSingleSegmentWithPagination(tableName, segment, 
TOTAL_SEGMENTS, SCAN_LIMIT, true, false);
+        List<Map<String, AttributeValue>> segmentItems =
+                scanSingleSegmentWithPagination(tableName, segment, 
TOTAL_SEGMENTS, SCAN_LIMIT,
+                        true, false, 0);
         LOGGER.info("Segment {} scan completed with {} items", segment, 
segmentItems.size());
         return segmentItems;
     }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
index c66b988..0dd7a78 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java
@@ -40,13 +40,15 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
 import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
 import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
-import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
 import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
 import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
 import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 
@@ -87,6 +89,18 @@ public class ScanIndex3IT {
         createTableAndInsertData();
     }
 
+    private static void executeBatchWrite(String tableName, List<WriteRequest> 
batch) {
+        if (batch.isEmpty()) {
+            return;
+        }
+        Map<String, List<WriteRequest>> requestItems = new HashMap<>();
+        requestItems.put(tableName, new ArrayList<>(batch));
+        BatchWriteItemRequest batchRequest =
+                
BatchWriteItemRequest.builder().requestItems(requestItems).build();
+        phoenixDBClientV2.batchWriteItem(batchRequest);
+        dynamoDbClient.batchWriteItem(batchRequest);
+    }
+
     private static void createTableAndInsertData() {
         CreateTableRequest createTableRequest = 
DDLTestUtils.getCreateTableRequest(
                 TABLE_NAME, "pk", ScalarAttributeType.S, "sk", 
ScalarAttributeType.N);
@@ -96,6 +110,7 @@ public class ScanIndex3IT {
         phoenixDBClientV2.createTable(createTableRequest);
         dynamoDbClient.createTable(createTableRequest);
 
+        List<WriteRequest> batch = new ArrayList<>();
         for (int i = 0; i < NUM_RECORDS; i++) {
             Map<String, AttributeValue> item = new HashMap<>();
             item.put("pk", AttributeValue.builder().s("pk_" + (i % 
100)).build());
@@ -126,10 +141,15 @@ public class ScanIndex3IT {
             nestedList.add(AttributeValue.builder().n(String.valueOf(i % 
40)).build());
             nestedList.add(AttributeValue.builder().ss("set_in_list_" + (i % 
3), "list_common").build());
             item.put("itms", AttributeValue.builder().l(nestedList).build());
-            
-            PutItemRequest putRequest = 
PutItemRequest.builder().tableName(TABLE_NAME).item(item).build();
-            phoenixDBClientV2.putItem(putRequest);
-            dynamoDbClient.putItem(putRequest);
+
+            WriteRequest writeRequest =
+                    
WriteRequest.builder().putRequest(PutRequest.builder().item(item).build())
+                            .build();
+            batch.add(writeRequest);
+            if (batch.size() >= 25 || i == NUM_RECORDS - 1) {
+                executeBatchWrite(TABLE_NAME, batch);
+                batch.clear();
+            }
         }
 
         for (int i = 0; i < NUM_RECORDS; i++) {
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java
new file mode 100644
index 0000000..70fb38a
--- /dev/null
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+
+/**
+ * Segment scan tests - Part 1.
+ */
+@RunWith(Parameterized.class)
+public class SegmentScan1IT extends SegmentScanIT {
+
+    public SegmentScan1IT(String hashKeyName, ScalarAttributeType hashKeyType, 
String sortKeyName,
+            ScalarAttributeType sortKeyType, boolean useFilter) {
+        super(hashKeyName, hashKeyType, sortKeyName, sortKeyType, useFilter);
+    }
+
+    @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Filter_{4}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                // Hash key only
+                {"pk", ScalarAttributeType.S, null, null, true},
+                {"pk", ScalarAttributeType.N, null, null, false},
+                {"pk", ScalarAttributeType.B, null, null, true},
+
+                // String Hash Key combinations
+                {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.S, 
false},
+                {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 
true},
+                {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.B, 
false},
+
+                // Number Hash Key combinations
+                {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S, 
true},
+                {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.N, 
false},
+                {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 
true}
+        });
+    }
+}
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java
new file mode 100644
index 0000000..28e922c
--- /dev/null
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.ddb;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
+
+/**
+ * Segment scan tests - Part 2.
+ */
+@RunWith(Parameterized.class)
+public class SegmentScan2IT extends SegmentScanIT {
+
+    public SegmentScan2IT(String hashKeyName, ScalarAttributeType hashKeyType, 
String sortKeyName,
+            ScalarAttributeType sortKeyType, boolean useFilter) {
+        super(hashKeyName, hashKeyType, sortKeyName, sortKeyType, useFilter);
+    }
+
+    @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Filter_{4}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                // Binary Hash Key combinations
+                {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 
false},
+                {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.N, 
true},
+                {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B, 
false},
+
+                // Additional combinations
+                {"pk", ScalarAttributeType.S, null, null, true},
+                {"pk", ScalarAttributeType.N, null, null, false},
+                {"pk", ScalarAttributeType.B, null, null, true},
+                {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 
false},
+                {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 
true},
+                {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 
false}
+        });
+    }
+}
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
index f325de7..8a168c3 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java
@@ -19,91 +19,48 @@ package org.apache.phoenix.ddb;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
 import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
 
 /**
- * Comprehensive parametrized test for segment scan functionality.
- * 
- * This test verifies that segment scans work correctly across different key 
configurations
- * ,segment counts, limits and filters. For each test run:
- * 1. Creates a table with specified key schema
- * 2. Inserts items while periodically splitting the underlying HBase table
- * 3. Performs both full scan and segment scan with pagination, limits and 
filters
- * 4. Verifies that both approaches return identical results to ddb
+ * Abstract base class for segment scan tests.
  */
-@RunWith(Parameterized.class)
-public class SegmentScanIT extends BaseSegmentScanIT {
-
-    private final String hashKeyName;
-    private final ScalarAttributeType hashKeyType;
-    private final String sortKeyName;
-    private final ScalarAttributeType sortKeyType;
-    private final int totalSegments;
-    private final int scanLimit;
-    private final boolean useFilter;
-    protected static final int TOTAL_ITEMS = 2000;
-    protected static final int SPLIT_FREQUENCY = 500;
+public abstract class SegmentScanIT extends BaseSegmentScanIT {
+
+    protected final String hashKeyName;
+    protected final ScalarAttributeType hashKeyType;
+    protected final String sortKeyName;
+    protected final ScalarAttributeType sortKeyType;
+    protected final int totalSegments;
+    protected final boolean useFilter;
+    protected final int filterNum;
+    protected static final int TOTAL_ITEMS = 25000;
+    protected static final int SPLIT_FREQUENCY = 4000;
 
     public SegmentScanIT(String hashKeyName, ScalarAttributeType hashKeyType, 
                          String sortKeyName, ScalarAttributeType sortKeyType, 
-                         int totalSegments, int scanLimit, boolean useFilter) {
+                         boolean useFilter) {
         this.hashKeyName = hashKeyName;
         this.hashKeyType = hashKeyType;
         this.sortKeyName = sortKeyName;
         this.sortKeyType = sortKeyType;
-        this.totalSegments = totalSegments;
-        this.scanLimit = scanLimit;
+        Random random = new Random();
+        this.totalSegments = random.nextInt(8) + 1;
         this.useFilter = useFilter;
+        this.filterNum = useFilter ? random.nextInt(15) + 1 : 0;
     }
 
-    @Parameterized.Parameters(name = 
"Hash_{1}_Sort_{3}_Segments_{4}_Limit_{5}_Filter_{6}")
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-            // Hash key only
-            {"pk", ScalarAttributeType.S, null, null, 3, 34, true},
-            {"pk", ScalarAttributeType.N, null, null, 4, 52, false},
-            {"pk", ScalarAttributeType.B, null, null, 2, 41, true},
-
-            // String Hash Key combinations
-            {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.S, 1, 28, 
false},
-            {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 2, 59, 
true},
-            {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.B, 4, 45, 
false},
-
-            // Number Hash Key combinations
-            {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S, 1, 23, 
true},
-            {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.N, 2, 37, 
false},
-            {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 3, 51, 
true},
-
-            // Binary Hash Key combinations
-            {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 2, 29, 
false},
-            {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.N, 4, 46, 
true},
-            {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B, 3, 33, 
false},
-
-            // Total Segments > #regions
-            {"pk", ScalarAttributeType.S, null, null, 5, 58, true},
-            {"pk", ScalarAttributeType.N, null, null, 6, 25, false},
-            {"pk", ScalarAttributeType.B, null, null, 7, 42, true},
-            {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 5, 31, 
false},
-            {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 6, 55, 
true},
-            {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 7, 38, 
false},
-        });
-    }
-
-
-    @Test(timeout = 300000)
+    @Test(timeout = 600000)
     public void testSegmentScan() throws Exception {
-        final String tableName = 
testName.getMethodName().replaceAll("[\\[\\]]", "_");
+        final String tableName = 
testName.getMethodName().replaceAll("[\\[\\]]", "_")
+                + "_" + generateRandomString(7);
 
         // Create table
         CreateTableRequest createTableRequest = 
DDLTestUtils.getCreateTableRequest(
@@ -112,33 +69,45 @@ public class SegmentScanIT extends BaseSegmentScanIT {
         dynamoDbClient.createTable(createTableRequest);
 
         // Insert items with periodic table splitting
-        insertItemsWithSplitting(tableName, hashKeyName, hashKeyType, 
sortKeyName, sortKeyType, 
-                                TOTAL_ITEMS, SPLIT_FREQUENCY, 100);
+        insertItemsWithSplitting(tableName, hashKeyName, hashKeyType, 
sortKeyName, sortKeyType,
+                TOTAL_ITEMS, SPLIT_FREQUENCY, 200);
+
+        Random random = new Random();
 
         // Perform full scan with pagination
-        List<Map<String, AttributeValue>> fullScanItemsPhoenix = 
performFullScanWithPagination(phoenixDBClientV2, tableName, useFilter);
-        List<Map<String, AttributeValue>> fullScanItemsDDB = 
performFullScanWithPagination(dynamoDbClient, tableName, useFilter);
+        List<Map<String, AttributeValue>> fullScanItemsPhoenix =
+                performFullScanWithPagination(phoenixDBClientV2, tableName, 
useFilter, filterNum,
+                        random.nextInt(150) + 1);
+        List<Map<String, AttributeValue>> fullScanItemsDDB =
+                performFullScanWithPagination(dynamoDbClient, tableName, 
useFilter, filterNum,
+                        random.nextInt(200) + 1);
 
         // Perform segment scan serially on all segments with pagination
-        List<Map<String, AttributeValue>> segmentScanItems = 
performSegmentScanWithPagination(tableName, useFilter);
+        List<Map<String, AttributeValue>> segmentScanItems =
+                performSegmentScanWithPagination(tableName, useFilter, 
filterNum);
 
         // Verify results
-        TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix, 
hashKeyName, sortKeyName);
-        TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems, 
hashKeyName, sortKeyName);
+        TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix, 
hashKeyName,
+                sortKeyName);
+        TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems, 
hashKeyName,
+                sortKeyName);
     }
 
     /**
      * Perform segment scan across all segments with pagination.
      */
-    private List<Map<String, AttributeValue>> 
performSegmentScanWithPagination(String tableName, boolean useFilter)
-            throws SQLException {
+    private List<Map<String, AttributeValue>> 
performSegmentScanWithPagination(String tableName,
+            boolean useFilter, int filterNum) throws SQLException {
         List<Map<String, AttributeValue>> allItems = new ArrayList<>();
+        Random random = new Random();
         
         // Scan each segment sequentially
         int numRegions = TestUtils.getNumberOfTableRegions(testConnection, 
tableName);
         for (int segment = 0; segment < totalSegments; segment++) {
-            List<Map<String, AttributeValue>> segmentItems = 
-                scanSingleSegmentWithPagination(tableName, segment, 
totalSegments, scanLimit, false, useFilter);
+            int scanLimit = random.nextInt(200) + 1;
+            List<Map<String, AttributeValue>> segmentItems =
+                    scanSingleSegmentWithPagination(tableName, segment, 
totalSegments, scanLimit,
+                            false, useFilter, filterNum);
             allItems.addAll(segmentItems);
             if (segment < numRegions && !useFilter) {
                 Assert.assertFalse(segmentItems.isEmpty());
@@ -146,4 +115,14 @@ public class SegmentScanIT extends BaseSegmentScanIT {
         }
         return allItems;
     }
-} 
\ No newline at end of file
+
+    private String generateRandomString(int length) {
+        String chars = "abcdefghijklmnopqrstuvwxyz0123456789-_.";
+        Random random = new Random();
+        StringBuilder sb = new StringBuilder(length);
+        for (int i = 0; i < length; i++) {
+            sb.append(chars.charAt(random.nextInt(chars.length())));
+        }
+        return sb.toString();
+    }
+}

Reply via email to