This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 21109328f72 [Java] Fix CassandraIO ReadFn to quote column names for 
reserved keywords (#36459)
21109328f72 is described below

commit 21109328f725bb9136316455db07507144df719f
Author: Suvrat Acharya <[email protected]>
AuthorDate: Wed Dec 3 02:11:02 2025 +0530

    [Java] Fix CassandraIO ReadFn to quote column names for reserved keywords 
(#36459)
    
    * Fix CassandraIO ReadFn to quote column names for reserved keywords
    
    * Minor fixes
---
 .../org/apache/beam/sdk/io/cassandra/ReadFn.java   |  10 +
 .../beam/sdk/io/cassandra/CassandraIOTest.java     | 374 +++++++++++++++++++++
 2 files changed, 384 insertions(+)

diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
index 678c72d42ff..8f16e729bc8 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
@@ -50,6 +50,7 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
           session.getCluster().getMetadata().getKeyspace(read.keyspace().get())
               .getTable(read.table().get()).getPartitionKey().stream()
               .map(ColumnMetadata::getName)
+              .map(ReadFn::quoteIdentifier)
               .collect(Collectors.joining(","));
 
       String query = generateRangeQuery(read, partitionKey, read.ringRanges() 
!= null);
@@ -148,4 +149,13 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
   private static String getJoinerClause(String queryString) {
     return queryString.toUpperCase().contains("WHERE") ? " AND " : " WHERE ";
   }
+
+  static String quoteIdentifier(String identifier) {
+    if (identifier == null) {
+      return null;
+    }
+    // Escape any existing double quotes by doubling them
+    String escaped = identifier.replace("\"", "\"\"");
+    return "\"" + escaped + "\"";
+  }
 }
diff --git 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 747f803ea46..df52421db23 100644
--- 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -844,4 +844,378 @@ public class CassandraIOTest implements Serializable {
   /** Simple Cassandra entity used in write tests. */
   @Table(name = CASSANDRA_TABLE_WRITE, keyspace = CASSANDRA_KEYSPACE)
   static class ScientistWrite extends Scientist {}
+
+  /** Test the quoteIdentifier utility method with various inputs. */
+  @Test
+  public void testQuoteIdentifier() {
+    // Test normal identifiers
+    assertEquals("\"normal_column\"", ReadFn.quoteIdentifier("normal_column"));
+    assertEquals("\"myTable\"", ReadFn.quoteIdentifier("myTable"));
+    assertEquals("\"column123\"", ReadFn.quoteIdentifier("column123"));
+
+    // Test reserved keywords
+    assertEquals("\"true\"", ReadFn.quoteIdentifier("true"));
+    assertEquals("\"key\"", ReadFn.quoteIdentifier("key"));
+    assertEquals("\"select\"", ReadFn.quoteIdentifier("select"));
+    assertEquals("\"from\"", ReadFn.quoteIdentifier("from"));
+    assertEquals("\"where\"", ReadFn.quoteIdentifier("where"));
+    assertEquals("\"table\"", ReadFn.quoteIdentifier("table"));
+    assertEquals("\"keyspace\"", ReadFn.quoteIdentifier("keyspace"));
+
+    // Test identifiers with existing quotes (should be escaped by doubling)
+    assertEquals("\"column\"\"with\"\"quotes\"", 
ReadFn.quoteIdentifier("column\"with\"quotes"));
+    assertEquals("\"single\"\"quote\"", 
ReadFn.quoteIdentifier("single\"quote"));
+    assertEquals("\"\"\"starts_with_quote\"", 
ReadFn.quoteIdentifier("\"starts_with_quote"));
+    assertEquals("\"ends_with_quote\"\"\"", 
ReadFn.quoteIdentifier("ends_with_quote\""));
+
+    // Test edge cases
+    assertEquals("\"\"", ReadFn.quoteIdentifier(""));
+    assertNull(ReadFn.quoteIdentifier(null));
+
+    // Test special characters that might be in identifiers
+    assertEquals("\"column with spaces\"", ReadFn.quoteIdentifier("column with 
spaces"));
+    assertEquals("\"column-with-dashes\"", 
ReadFn.quoteIdentifier("column-with-dashes"));
+    assertEquals("\"column.with.dots\"", 
ReadFn.quoteIdentifier("column.with.dots"));
+  }
+
+  /**
+   * Test reading from a table with reserved keyword column names. This 
integration test verifies
+   * the complete fix works end-to-end.
+   */
+  @Test
+  public void testReadWithReservedKeywordColumns() throws Exception {
+    String reservedTableName = "reserved_keywords_table";
+
+    // Create table with reserved keyword column names
+    String createTableQuery =
+        String.format(
+            "CREATE TABLE IF NOT EXISTS %s.%s("
+                + "\"true\" text, \"key\" text, \"select\" text, normal_column 
text, "
+                + "PRIMARY KEY (\"true\", \"key\")"
+                + ");",
+            CASSANDRA_KEYSPACE, reservedTableName);
+
+    session.execute(createTableQuery);
+
+    // Insert test data with reserved keyword column names
+    String insertQuery1 =
+        String.format(
+            "INSERT INTO %s.%s(\"true\", \"key\", \"select\", normal_column) "
+                + "VALUES ('true_value_1', 'key_value_1', 'select_value_1', 
'normal_value_1');",
+            CASSANDRA_KEYSPACE, reservedTableName);
+    session.execute(insertQuery1);
+
+    String insertQuery2 =
+        String.format(
+            "INSERT INTO %s.%s(\"true\", \"key\", \"select\", normal_column) "
+                + "VALUES ('true_value_2', 'key_value_2', 'select_value_2', 
'normal_value_2');",
+            CASSANDRA_KEYSPACE, reservedTableName);
+    session.execute(insertQuery2);
+
+    // Flush to ensure data is written
+    flushMemTablesAndRefreshSizeEstimates();
+
+    // Test reading with CassandraIO - this should work with the fix
+    PCollection<ReservedKeywordEntity> output =
+        pipeline.apply(
+            CassandraIO.<ReservedKeywordEntity>read()
+                .withHosts(Collections.singletonList(CASSANDRA_HOST))
+                .withPort(cassandraPort)
+                .withKeyspace(CASSANDRA_KEYSPACE)
+                .withTable(reservedTableName)
+                .withCoder(SerializableCoder.of(ReservedKeywordEntity.class))
+                .withEntity(ReservedKeywordEntity.class));
+
+    // Verify we can read the data successfully
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(2L);
+
+    PAssert.that(output)
+        .satisfies(
+            input -> {
+              List<ReservedKeywordEntity> entities = new ArrayList<>();
+              input.forEach(entities::add);
+
+              assertEquals(2, entities.size());
+
+              // Check that data was read correctly
+              boolean foundFirst = false, foundSecond = false;
+              for (ReservedKeywordEntity entity : entities) {
+                if ("true_value_1".equals(entity.trueColumn)) {
+                  assertEquals("key_value_1", entity.keyColumn);
+                  assertEquals("select_value_1", entity.selectColumn);
+                  assertEquals("normal_value_1", entity.normalColumn);
+                  foundFirst = true;
+                } else if ("true_value_2".equals(entity.trueColumn)) {
+                  assertEquals("key_value_2", entity.keyColumn);
+                  assertEquals("select_value_2", entity.selectColumn);
+                  assertEquals("normal_value_2", entity.normalColumn);
+                  foundSecond = true;
+                }
+              }
+
+              assertTrue("Should find first test record", foundFirst);
+              assertTrue("Should find second test record", foundSecond);
+              return null;
+            });
+
+    pipeline.run();
+
+    // Clean up test table
+    session.execute(
+        String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE, 
reservedTableName));
+  }
+
+  /** Test reading with a custom query that includes reserved keyword column 
names. */
+  @Test
+  public void testReadWithCustomQueryAndReservedKeywords() throws Exception {
+    String customQueryTableName = "custom_query_test";
+
+    // Create table with reserved keyword column names
+    String createTableQuery =
+        String.format(
+            "CREATE TABLE IF NOT EXISTS %s.%s("
+                + "\"from\" text, \"where\" text, data text, "
+                + "PRIMARY KEY (\"from\", \"where\")"
+                + ");",
+            CASSANDRA_KEYSPACE, customQueryTableName);
+
+    session.execute(createTableQuery);
+
+    // Insert test data
+    String insertQuery =
+        String.format(
+            "INSERT INTO %s.%s(\"from\", \"where\", data) "
+                + "VALUES ('source1', 'condition1', 'test_data');",
+            CASSANDRA_KEYSPACE, customQueryTableName);
+    session.execute(insertQuery);
+
+    // Test with custom query that has WHERE clause - this tests the query 
building logic
+    String customQuery =
+        String.format(
+            "SELECT \"from\", \"where\", data FROM %s.%s WHERE 
\"from\"='source1'",
+            CASSANDRA_KEYSPACE, customQueryTableName);
+
+    PCollection<CustomQueryEntity> output =
+        pipeline.apply(
+            CassandraIO.<CustomQueryEntity>read()
+                .withHosts(Collections.singletonList(CASSANDRA_HOST))
+                .withPort(cassandraPort)
+                .withKeyspace(CASSANDRA_KEYSPACE)
+                .withTable(customQueryTableName)
+                .withQuery(customQuery)
+                .withCoder(SerializableCoder.of(CustomQueryEntity.class))
+                .withEntity(CustomQueryEntity.class));
+
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(1L);
+
+    PAssert.that(output)
+        .satisfies(
+            input -> {
+              CustomQueryEntity entity = input.iterator().next();
+              assertEquals("source1", entity.fromColumn);
+              assertEquals("condition1", entity.whereColumn);
+              assertEquals("test_data", entity.data);
+              return null;
+            });
+
+    pipeline.run();
+
+    // Clean up
+    session.execute(
+        String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE, 
customQueryTableName));
+  }
+
+  /** Test that the fix handles multiple partition key columns with reserved 
keywords. */
+  @Test
+  public void testMultiplePartitionKeyReservedWords() throws Exception {
+    String multiPartitionTableName = "multi_partition_test";
+
+    // Create table with multiple partition key columns that are reserved 
keywords
+    String createTableQuery =
+        String.format(
+            "CREATE TABLE IF NOT EXISTS %s.%s("
+                + "\"table\" text, \"index\" text, \"value\" text, data text, "
+                + "PRIMARY KEY ((\"table\", \"index\"), \"value\")"
+                + ");",
+            CASSANDRA_KEYSPACE, multiPartitionTableName);
+
+    session.execute(createTableQuery);
+
+    // Insert test data
+    String insertQuery =
+        String.format(
+            "INSERT INTO %s.%s(\"table\", \"index\", \"value\", data) "
+                + "VALUES ('table1', 'index1', 'value1', 'test_data');",
+            CASSANDRA_KEYSPACE, multiPartitionTableName);
+    session.execute(insertQuery);
+
+    PCollection<MultiPartitionEntity> output =
+        pipeline.apply(
+            CassandraIO.<MultiPartitionEntity>read()
+                .withHosts(Collections.singletonList(CASSANDRA_HOST))
+                .withPort(cassandraPort)
+                .withKeyspace(CASSANDRA_KEYSPACE)
+                .withTable(multiPartitionTableName)
+                .withCoder(SerializableCoder.of(MultiPartitionEntity.class))
+                .withEntity(MultiPartitionEntity.class));
+
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(1L);
+
+    PAssert.that(output)
+        .satisfies(
+            input -> {
+              MultiPartitionEntity entity = input.iterator().next();
+              assertEquals("table1", entity.tableColumn);
+              assertEquals("index1", entity.indexColumn);
+              assertEquals("value1", entity.valueColumn);
+              assertEquals("test_data", entity.data);
+              return null;
+            });
+
+    pipeline.run();
+
+    // Clean up
+    session.execute(
+        String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE, 
multiPartitionTableName));
+  }
+
+  /** Test that normal (non-reserved) identifiers still work correctly after 
the fix. */
+  @Test
+  public void testNormalIdentifiersStillWork() throws Exception {
+    // This test uses the existing CASSANDRA_TABLE which has normal column 
names
+    // to ensure our changes don't break existing functionality
+
+    PCollection<Scientist> output =
+        pipeline.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(Collections.singletonList(CASSANDRA_HOST))
+                .withPort(cassandraPort)
+                .withKeyspace(CASSANDRA_KEYSPACE)
+                .withTable(CASSANDRA_TABLE)
+                .withCoder(SerializableCoder.of(Scientist.class))
+                .withEntity(Scientist.class));
+
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(NUM_ROWS);
+
+    pipeline.run();
+  }
+
+  // Add these entity classes after the existing entity classes at the end of 
the file
+
+  /** Test entity class for reserved keyword column names to verify identifier 
quoting. */
+  @Table(name = "reserved_keywords_table", keyspace = CASSANDRA_KEYSPACE)
+  static class ReservedKeywordEntity implements Serializable {
+
+    @PartitionKey
+    @Column(name = "true") // Reserved keyword as column name
+    String trueColumn;
+
+    @ClusteringColumn
+    @Column(name = "key") // Reserved keyword as column name
+    String keyColumn;
+
+    @Column(name = "select") // Reserved keyword as column name
+    String selectColumn;
+
+    @Column(name = "normal_column") // Normal column name
+    String normalColumn;
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ReservedKeywordEntity that = (ReservedKeywordEntity) o;
+      return Objects.equal(trueColumn, that.trueColumn)
+          && Objects.equal(keyColumn, that.keyColumn)
+          && Objects.equal(selectColumn, that.selectColumn)
+          && Objects.equal(normalColumn, that.normalColumn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(trueColumn, keyColumn, selectColumn, 
normalColumn);
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "ReservedKeywordEntity{true='%s', key='%s', select='%s', 
normal='%s'}",
+          trueColumn, keyColumn, selectColumn, normalColumn);
+    }
+  }
+
+  /** Test entity for custom query test with reserved keyword column names. */
+  @Table(name = "custom_query_test", keyspace = CASSANDRA_KEYSPACE)
+  static class CustomQueryEntity implements Serializable {
+    @PartitionKey
+    @Column(name = "from")
+    String fromColumn;
+
+    @ClusteringColumn
+    @Column(name = "where")
+    String whereColumn;
+
+    @Column String data;
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      CustomQueryEntity that = (CustomQueryEntity) o;
+      return Objects.equal(fromColumn, that.fromColumn)
+          && Objects.equal(whereColumn, that.whereColumn)
+          && Objects.equal(data, that.data);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(fromColumn, whereColumn, data);
+    }
+  }
+
+  /** Test entity for multiple partition key test with reserved keywords. */
+  @Table(name = "multi_partition_test", keyspace = CASSANDRA_KEYSPACE)
+  static class MultiPartitionEntity implements Serializable {
+    @PartitionKey(0)
+    @Column(name = "table")
+    String tableColumn;
+
+    @PartitionKey(1)
+    @Column(name = "index")
+    String indexColumn;
+
+    @ClusteringColumn
+    @Column(name = "value")
+    String valueColumn;
+
+    @Column String data;
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      MultiPartitionEntity that = (MultiPartitionEntity) o;
+      return Objects.equal(tableColumn, that.tableColumn)
+          && Objects.equal(indexColumn, that.indexColumn)
+          && Objects.equal(valueColumn, that.valueColumn)
+          && Objects.equal(data, that.data);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(tableColumn, indexColumn, valueColumn, data);
+    }
+  }
 }

Reply via email to