This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c5d98c49 PHOENIX-7636 CDC on table with case-sensitive pk columns 
fails to read change records (#2186)
d3c5d98c49 is described below

commit d3c5d98c49ec6c7ec4b8c22482673057cc49d487
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Wed Jun 11 17:16:49 2025 -0700

    PHOENIX-7636 CDC on table with case-sensitive pk columns fails to read 
change records (#2186)
---
 .../org/apache/phoenix/schema/MetaDataClient.java  | 11 ++--
 .../java/org/apache/phoenix/end2end/Bson3IT.java   | 73 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 4 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index a1cbfa10b4..54b3c7f254 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2022,11 +2022,14 @@ public class MetaDataClient {
         int pkOffset = dataTable.getBucketNum() != null ? 1 : 0;
         for (int i = pkOffset; i < pkColumns.size(); ++i) {
             PColumn pcol = pkColumns.get(i);
-            
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
-                    pcol.getDataType().getSqlTypeName(), false, null, false, 
pcol.getMaxLength(),
-                    pcol.getScale(), false, pcol.getSortOrder(), "", null, 
false));
+            columnDefs.add(
+                    FACTORY.columnDef(FACTORY.columnName("\"" + 
pcol.getName().getString() + "\""),
+                            pcol.getDataType().getSqlTypeName(), false, null, 
false,
+                            pcol.getMaxLength(), pcol.getScale(), false,
+                            pcol.getSortOrder(), "", null, false));
             
pkColumnDefs.add(FACTORY.columnDefInPkConstraint(FACTORY.columnName(
-                    pcol.getName().getString()), pcol.getSortOrder(), 
pcol.isRowTimestamp()));
+                            "\"" + pcol.getName().getString() + "\""), 
pcol.getSortOrder(),
+                    pcol.isRowTimestamp()));
         }
         
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
                 PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
index 8a2070e183..1adad43dc3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
@@ -32,6 +32,7 @@ import org.bson.BsonBinary;
 import org.bson.BsonBoolean;
 import org.bson.BsonDocument;
 import org.bson.BsonDouble;
+import org.bson.BsonInt32;
 import org.bson.BsonNull;
 import org.bson.BsonString;
 import org.bson.RawBsonDocument;
@@ -57,6 +58,7 @@ import java.util.Properties;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -1860,4 +1862,75 @@ public class Bson3IT extends ParallelStatsDisabledIT {
     }
   }
 
+  @Test
+  public void testCDCWithCaseSenstitiveTableAndPks() throws Exception {
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    String tableName = "XYZ.\"test.table\"";
+    String cdcName = "XYZ.\"CDC_test.table\"";
+    String cdcNameWithoutSchema = "\"CDC_test.table\"";
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      String ddl = "CREATE TABLE " + tableName +
+              " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY 
KEY(\"hk\"))";
+      conn.createStatement().execute(ddl);
+
+      String cdcDdl = "CREATE CDC " + cdcNameWithoutSchema + " ON " + 
tableName;
+      conn.createStatement().execute(cdcDdl);
+
+      String alterDdl = "ALTER TABLE " + tableName
+              + " SET SCHEMA_VERSION = 'NEW_AND_OLD_IMAGES'";
+      conn.createStatement().execute(alterDdl);
+
+      Timestamp ts1 = new Timestamp(System.currentTimeMillis());
+      Thread.sleep(100);
+
+      BsonDocument bsonDocument = new BsonDocument()
+              .append("field1", new BsonString("value1"))
+              .append("field2", new BsonInt32(42))
+              .append("field3", new BsonBoolean(true));
+
+      PreparedStatement stmt = conn.prepareStatement(
+              "UPSERT INTO " + tableName + " VALUES (?,?)");
+      stmt.setString(1, "key1");
+      stmt.setObject(2, bsonDocument);
+      stmt.executeUpdate();
+      conn.commit();
+
+      Thread.sleep(100);
+      Timestamp ts2 = new Timestamp(System.currentTimeMillis());
+
+      ResultSet rs = conn.createStatement().executeQuery(
+              "SELECT DISTINCT PARTITION_ID() FROM " + cdcName);
+      assertTrue("Expected one partition", rs.next());
+      String partitionId = rs.getString(1);
+      assertFalse("Expected only one partition", rs.next());
+
+      String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + 
cdcName
+              + " WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() >= ? 
AND "
+              + "PHOENIX_ROW_TIMESTAMP() <= ?";
+      PreparedStatement ps = conn.prepareStatement(cdcQuery);
+      ps.setString(1, partitionId);
+      ps.setTimestamp(2, ts1);
+      ps.setTimestamp(3, ts2);
+
+      rs = ps.executeQuery();
+
+      assertTrue("Expected at least one CDC record", rs.next());
+
+      String cdcVal = rs.getString(3);
+      Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class);
+
+      Map<String, Object> preImage = (Map<String, Object>) 
map.get(QueryConstants.CDC_PRE_IMAGE);
+      assertNull("Pre-image should be null for first insert", 
preImage.get("COL"));
+
+      Map<String, Object> postImage = (Map<String, Object>) 
map.get(QueryConstants.CDC_POST_IMAGE);
+      String encodedBytes = (String) postImage.get("COL");
+      byte[] bytes = Base64.getDecoder().decode(encodedBytes);
+      RawBsonDocument actualDoc = new RawBsonDocument(bytes, 0, bytes.length);
+      assertEquals("Post-image BSON document should match inserted document", 
bsonDocument,
+              actualDoc);
+
+      assertFalse("Should only have one CDC record", rs.next());
+    }
+  }
+
 }

Reply via email to