This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new d3c5d98c49 PHOENIX-7636 CDC on table with case-sensitive pk columns fails to read change records (#2186) d3c5d98c49 is described below commit d3c5d98c49ec6c7ec4b8c22482673057cc49d487 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Wed Jun 11 17:16:49 2025 -0700 PHOENIX-7636 CDC on table with case-sensitive pk columns fails to read change records (#2186) --- .../org/apache/phoenix/schema/MetaDataClient.java | 11 ++-- .../java/org/apache/phoenix/end2end/Bson3IT.java | 73 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index a1cbfa10b4..54b3c7f254 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -2022,11 +2022,14 @@ public class MetaDataClient { int pkOffset = dataTable.getBucketNum() != null ? 1 : 0; for (int i = pkOffset; i < pkColumns.size(); ++i) { PColumn pcol = pkColumns.get(i); - columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()), - pcol.getDataType().getSqlTypeName(), false, null, false, pcol.getMaxLength(), - pcol.getScale(), false, pcol.getSortOrder(), "", null, false)); + columnDefs.add( + FACTORY.columnDef(FACTORY.columnName("\"" + pcol.getName().getString() + "\""), + pcol.getDataType().getSqlTypeName(), false, null, false, + pcol.getMaxLength(), pcol.getScale(), false, + pcol.getSortOrder(), "", null, false)); pkColumnDefs.add(FACTORY.columnDefInPkConstraint(FACTORY.columnName( - pcol.getName().getString()), pcol.getSortOrder(), pcol.isRowTimestamp())); + "\"" + pcol.getName().getString() + "\""), pcol.getSortOrder(), + pcol.isRowTimestamp())); } columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME), PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java index 8a2070e183..1adad43dc3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java @@ -32,6 +32,7 @@ import org.bson.BsonBinary; import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonDouble; +import org.bson.BsonInt32; import org.bson.BsonNull; import org.bson.BsonString; import org.bson.RawBsonDocument; @@ -57,6 +58,7 @@ import java.util.Properties; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -1860,4 +1862,75 @@ public class Bson3IT extends ParallelStatsDisabledIT { } } + @Test + public void testCDCWithCaseSenstitiveTableAndPks() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = "XYZ.\"test.table\""; + String cdcName = "XYZ.\"CDC_test.table\""; + String cdcNameWithoutSchema = "\"CDC_test.table\""; + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "CREATE TABLE " + tableName + + " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY KEY(\"hk\"))"; + conn.createStatement().execute(ddl); + + String cdcDdl = "CREATE CDC " + cdcNameWithoutSchema + " ON " + tableName; + conn.createStatement().execute(cdcDdl); + + String alterDdl = "ALTER TABLE " + tableName + + " SET SCHEMA_VERSION = 'NEW_AND_OLD_IMAGES'"; + conn.createStatement().execute(alterDdl); + + Timestamp ts1 = new Timestamp(System.currentTimeMillis()); + Thread.sleep(100); + + BsonDocument bsonDocument = new BsonDocument() + .append("field1", new BsonString("value1")) + .append("field2", new BsonInt32(42)) + .append("field3", new BsonBoolean(true)); + + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + tableName + " VALUES (?,?)"); + stmt.setString(1, "key1"); + stmt.setObject(2, bsonDocument); + stmt.executeUpdate(); + conn.commit(); + + Thread.sleep(100); + Timestamp ts2 = new Timestamp(System.currentTimeMillis()); + + ResultSet rs = conn.createStatement().executeQuery( + "SELECT DISTINCT PARTITION_ID() FROM " + cdcName); + assertTrue("Expected one partition", rs.next()); + String partitionId = rs.getString(1); + assertFalse("Expected only one partition", rs.next()); + + String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName + + " WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() >= ? AND " + + "PHOENIX_ROW_TIMESTAMP() <= ?"; + PreparedStatement ps = conn.prepareStatement(cdcQuery); + ps.setString(1, partitionId); + ps.setTimestamp(2, ts1); + ps.setTimestamp(3, ts2); + + rs = ps.executeQuery(); + + assertTrue("Expected at least one CDC record", rs.next()); + + String cdcVal = rs.getString(3); + Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class); + + Map<String, Object> preImage = (Map<String, Object>) map.get(QueryConstants.CDC_PRE_IMAGE); + assertNull("Pre-image should be null for first insert", preImage.get("COL")); + + Map<String, Object> postImage = (Map<String, Object>) map.get(QueryConstants.CDC_POST_IMAGE); + String encodedBytes = (String) postImage.get("COL"); + byte[] bytes = Base64.getDecoder().decode(encodedBytes); + RawBsonDocument actualDoc = new RawBsonDocument(bytes, 0, bytes.length); + assertEquals("Post-image BSON document should match inserted document", bsonDocument, + actualDoc); + + assertFalse("Should only have one CDC record", rs.next()); + } + } + }