This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/phoenix-adapters.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e3b112  eventId for change stream record
6e3b112 is described below

commit 6e3b112a972dbf466ad4184abdf9f8ca4a5eb104
Author: Palash Chauhan <[email protected]>
AuthorDate: Tue May 12 10:50:22 2026 -0700

    eventId for change stream record
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../phoenix/ddb/service/GetRecordsService.java     | 10 ++++---
 .../java/org/apache/phoenix/ddb/GetRecordsIT.java  |  5 ++++
 .../org/apache/phoenix/ddb/ReturnItemsLimitIT.java |  2 +-
 .../java/org/apache/phoenix/ddb/TestUtils.java     | 34 +++++++++++++++++++++-
 .../org/apache/phoenix/ddb/utils/ApiMetadata.java  |  1 +
 .../phoenix/ddb/utils/DdbAdapterCdcUtils.java      | 24 +++++++++++++++
 6 files changed, 70 insertions(+), 6 deletions(-)

diff --git 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
index 84ec289..4a49500 100644
--- 
a/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
+++ 
b/phoenix-ddb-rest/src/main/java/org/apache/phoenix/ddb/service/GetRecordsService.java
@@ -20,7 +20,6 @@ import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -88,8 +87,9 @@ public class GetRecordsService {
                     lastTs = ts;
                     lastOffset=0;
                 }
-                record = getStreamRecord(rs, pIter.getStreamType(), pkCols,
-                        DdbAdapterCdcUtils.getSequenceNumber(lastTs, 
lastOffset));
+                String seqNum = DdbAdapterCdcUtils.getSequenceNumber(lastTs, 
lastOffset);
+                record = getStreamRecord(rs, pIter.getStreamType(), pkCols, 
seqNum,
+                        pIter.getTableName(), pIter.getPartitionId());
                 records.add(record);
                 count++;
                 bytesSize +=
@@ -156,7 +156,8 @@ public class GetRecordsService {
      * rs --> timestamp, pk1, (pk2), cdcJson
      */
     private static Map<String, Object> getStreamRecord(ResultSet rs, String 
streamType, List<PColumn> pkCols,
-                                          String seqNum) throws SQLException, 
JsonProcessingException {
+                                          String seqNum, String tableName, 
String partitionId)
+            throws SQLException, JsonProcessingException {
         Map<String, Object> streamRecord = new HashMap<>();
         streamRecord.put(ApiMetadata.STREAM_VIEW_TYPE, streamType);
         streamRecord.put(ApiMetadata.SEQUENCE_NUMBER, seqNum);
@@ -211,6 +212,7 @@ public class GetRecordsService {
         } else {
             record.put(ApiMetadata.EVENT_NAME, "MODIFY");
         }
+        record.put(ApiMetadata.EVENT_ID, 
DdbAdapterCdcUtils.getEventId(tableName, partitionId, seqNum));
         return record;
     }
 }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
index cd0a85c..5595070 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/GetRecordsIT.java
@@ -486,5 +486,10 @@ public class GetRecordsIT {
         Assert.assertEquals(ddbRecord.dynamodb().newImage(), 
phoenixRecord.dynamodb().newImage());
         Assert.assertTrue(ddbRecord.dynamodb().sizeBytes() > 0);
         Assert.assertTrue(phoenixRecord.dynamodb().sizeBytes() > 0);
+        // eventID must be present and be a 32-char hex string
+        Assert.assertNotNull("eventID must be present", 
phoenixRecord.eventID());
+        Assert.assertTrue("eventID must be 32-char hex",
+                phoenixRecord.eventID().matches("[0-9a-f]{32}"));
     }
+
 }
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
index c6388f1..409acfc 100644
--- 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
+++ 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ReturnItemsLimitIT.java
@@ -508,7 +508,7 @@ public class ReturnItemsLimitIT {
         // Verify that both services returned the same number of records
         Assert.assertEquals("Phoenix and DynamoDB should return same number of 
records",
                 ddbAllResponses.size(), phoenixAllResponses.size());
-        TestUtils.validateRecords(ddbAllResponses, phoenixAllResponses);
+        TestUtils.validateRecords(phoenixAllResponses, ddbAllResponses);
     }
 
     /**
diff --git 
a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java 
b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
index 75cf914..f1a5fe7 100644
--- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
+++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/TestUtils.java
@@ -23,8 +23,10 @@ import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -223,7 +225,7 @@ public class TestUtils {
     }
 
     /**
-     * Validate change records.
+     * Validate change records. Ensure first argument is Phoenix records.
      */
     public static void validateRecords(List<Record> phoenixRecords, 
List<Record> dynamoRecords) {
         Assert.assertEquals("Stream record counts should match between Phoenix 
and DynamoDB",
@@ -250,6 +252,36 @@ public class TestUtils {
             Assert.assertTrue("Phoenix record size should be greater than 0 
for record " + i,
                     pr.dynamodb().sizeBytes() > 0);
         }
+        assertEventIdsUnique(phoenixRecords);
+    }
+
+    /**
+     * Assert that eventIDs are present, well-formed, and unique across 
distinct events.
+     * Records with the same sequenceNumber (re-read via AT_SEQUENCE_NUMBER) 
must produce
+     * the same eventID (deterministic). Records with different 
sequenceNumbers must produce
+     * different eventIDs (unique).
+     */
+    public static void assertEventIdsUnique(List<Record> records) {
+        // First pass: build seqNum -> eventID map, verifying format
+        Map<String, String> seqToEventId = new HashMap<>();
+        for (Record record : records) {
+            Assert.assertNotNull("eventID must be present", record.eventID());
+            Assert.assertTrue("eventID must be 32-char hex",
+                    record.eventID().matches("[0-9a-f]{32}"));
+            String seqNum = record.dynamodb().sequenceNumber();
+            seqToEventId.put(seqNum, record.eventID());
+        }
+
+        // Second pass: verify determinism and uniqueness
+        Set<String> seenEventIds = new HashSet<>();
+        for (Record record : records) {
+            String seqNum = record.dynamodb().sequenceNumber();
+            Assert.assertEquals("eventID must be stable for same 
sequenceNumber",
+                    seqToEventId.get(seqNum), record.eventID());
+            seenEventIds.add(record.eventID());
+        }
+        Assert.assertEquals("eventIDs must be unique across distinct events",
+                seqToEventId.size(), seenEventIds.size());
     }
 
     /**
diff --git 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
index ce102e1..9df9dfa 100644
--- 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
+++ 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/ApiMetadata.java
@@ -155,6 +155,7 @@ public class ApiMetadata {
     public static final String OLD_IMAGE = "OldImage";
     public static final String KEYS = "Keys";
     public static final String SIZE_BYTES = "SizeBytes";
+    public static final String EVENT_ID = "eventID";
     public static final String USER_IDENTITY = "userIdentity";
     public static final String TYPE = "Type";
     public static final String PRINCIPAL_ID = "PrincipalId";
diff --git 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
index e454be2..40614c5 100644
--- 
a/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
+++ 
b/phoenix-ddb-utils/src/main/java/org/apache/phoenix/ddb/utils/DdbAdapterCdcUtils.java
@@ -8,6 +8,10 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.util.CDCUtil;
 
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -189,6 +193,26 @@ public class DdbAdapterCdcUtils {
         return timestamp + String.format("%0" + OFFSET_LENGTH + "d", offset);
     }
 
+    /**
+     * Generate a globally unique, deterministic eventID for a change stream 
record.
+     * Formula: md5Hex(tableName + "|" + partitionId + "|" + sequenceNumber)
+     *
+     * @param tableName the table name from the shard iterator
+     * @param partitionId the partition (shard) ID
+     * @param sequenceNumber the per-event sequence number
+     * @return 32-char lowercase hex string
+     */
+    public static String getEventId(String tableName, String partitionId, 
String sequenceNumber) {
+        try {
+            String input = tableName + "|" + partitionId + "|" + 
sequenceNumber;
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            byte[] digest = md.digest(input.getBytes(StandardCharsets.UTF_8));
+            return String.format("%032x", new BigInteger(1, digest));
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 algorithm not available", e);
+        }
+    }
+
     /**
      * Parse the provided stream name and return a particular component.
      * @param streamName stream name

Reply via email to