This is an automated email from the ASF dual-hosted git repository. palashc pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new be04ae882f PHOENIX-7642 : Add CDC Stream creation datetime to stream name (#2194) be04ae882f is described below commit be04ae882f8d1f4d17c3327a0d66d1cbcd91bab6 Author: Palash Chauhan <palashc...@gmail.com> AuthorDate: Tue Jun 17 19:21:28 2025 -0700 PHOENIX-7642 : Add CDC Stream creation datetime to stream name (#2194) Co-authored-by: Palash Chauhan <p.chau...@pchauha-ltmgv47.internal.salesforce.com> --- .../java/org/apache/phoenix/schema/MetaDataClient.java | 8 ++++++-- .../src/main/java/org/apache/phoenix/util/CDCUtil.java | 15 +++++++++++++-- .../src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java | 7 ++++--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 54b3c7f254..96f11462c8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -2067,7 +2067,9 @@ public class MetaDataClient { // create Stream with ENABLING status long cdcIndexTimestamp = CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName)); String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; - String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcObjName, cdcIndexTimestamp); + String streamName = String.format(CDC_STREAM_NAME_FORMAT, + tableName, cdcObjName, cdcIndexTimestamp, + CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp)); try (PreparedStatement ps = connection.prepareStatement(streamStatusSQL)) { ps.setString(1, tableName); ps.setString(2, streamName); @@ -4020,7 +4022,9 @@ public class MetaDataClient { // Mark CDC Stream as Disabled long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp(); String streamStatusSQL = "UPSERT INTO " + SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)"; - String streamName = String.format(CDC_STREAM_NAME_FORMAT, parentTableName, cdcTableName, cdcIndexTimestamp); + String streamName = String.format(CDC_STREAM_NAME_FORMAT, + parentTableName, cdcTableName, cdcIndexTimestamp, + CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp)); try (PreparedStatement ps = connection.prepareStatement(streamStatusSQL)) { ps.setString(1, parentTableName); ps.setString(2, streamName); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java index 4dca1eb03c..3117c5cebc 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java @@ -20,12 +20,16 @@ package org.apache.phoenix.util; import java.sql.SQLException; import java.sql.Types; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.Base64; +import java.util.Date; import java.util.HashSet; import java.util.Map; import java.util.NavigableSet; import java.util.Set; import java.util.StringTokenizer; +import java.util.TimeZone; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -41,8 +45,8 @@ import org.bson.RawBsonDocument; public class CDCUtil { public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_"; - // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index timestamp} - public static String CDC_STREAM_NAME_FORMAT = "phoenix/cdc/stream/%s/%s/%d"; + // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index timestamp}/{cdc index creation datetime} + public static String CDC_STREAM_NAME_FORMAT = "phoenix/cdc/stream/%s/%s/%d/%s"; /** * Make a set of CDC change scope enums from the given string containing comma separated scope @@ -180,4 +184,11 @@ public class CDCUtil { } return -1; } + + public static String getCDCCreationUTCDateTime(long timestamp) { + Date date = new Date(timestamp); + DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); + format.setTimeZone(TimeZone.getTimeZone("Etc/UTC")); + return format.format(date); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index 3fb604d8d0..4956245b1c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -1019,9 +1019,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT { */ public String getStreamName(Connection conn, String tableName, String cdcName) throws SQLException { - return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, - CDCUtil.getCDCCreationTimestamp( - conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName))); + long creationTS = CDCUtil.getCDCCreationTimestamp( + conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)); + return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, creationTS, + CDCUtil.getCDCCreationUTCDateTime(creationTS)); } /**