Poorvankbhatia opened a new pull request, #38:
URL: https://github.com/apache/flink-connector-cassandra/pull/38

   ### Cassandra Sink V2
   This PR is a detailed Phase-1 implementation of 
[FLIP-533](https://cwiki.apache.org/confluence/display/FLINK/FLIP-533%3A+Cassandra+Sink+V2).
 
   It adds a new Cassandra sink for Flink that cleanly separates planning (CQL 
generation/binding) from writing
   
   POJO writing via DataStax Mapper; Row/RowData/Tuple & Scala.Product via 
static & dynamic CQL
   
   - POJO: implemented with DataStax Mapper (upsert semantics), mapper options 
(TTL, consistency, saveNullFields)
   - Row / RowData / Tuple / Scala.Product: support static CQL (INSERT/UPDATE) 
and dynamic CQL (per-record routing/clauses/values/customizers)
   
   ### What this unlocks
   
   - **Static CQL (`Row`, `RowData`,`Tuple`,`Scala.Product`)**
     - Write your own `UPDATE ... SET ... WHERE ...` (incl. `IF` LWT, `USING 
TTL/TIMESTAMP`).
     - **Ignore nulls via `UNSET`** to avoid tombstones.
     - Per-statement **consistency/timeout** via customizer.
   
   - **Dynamic CQL (`Row`, `RowData`)**
     - **Per-record routing**: resolve keyspace/table dynamically 
(multi-tenant, time-partitioned tables).
     - **Per-record values & partial updates**: choose which columns to `SET`, 
**skip nulls**.
     - **Per-record clauses**: `USING` and `IF` generated from the record.
     - **Per-record statement customization**: consistency and timeout 
decisions at write time.
     - **Prepared-statement caching** for throughput.
   
   ### Capabilities (what the sink supports)
   
   **Static CQL (user-provided):**
   - INSERT / UPDATE / LWT (IF)
   - USING TTL / TIMESTAMP
   - Quoted identifiers & composite keys
   - Optional ignore-null via **UNSET** (avoid tombstones)
   - Statement customization (consistency, timeout)
   
   **Dynamic CQL (planned per record):**
   - Table routing (incl. multi-keyspace) via `TableResolver`
   - Composite-key updates and per-record value binding via 
`ColumnValueResolver`
   - Per-record USING / IF via `CqlClauseResolver`
   - Per-record `StatementCustomizer` (consistency/timeout)
   - Prepared statement reuse via `PreparedStatementCache`
   
   **Input formats:**
   - POJO (DataStax Mapper; static INSERT upsert with mapper options)
   - Row (CQL)
   - RowData (CQL)
   
   **Writer semantics:**
   - Bounded concurrency (permits) + backpressure
   - Retries with fatal vs. retriable classification
   - `flush()` waits for all in-flight operations
   - Success/failure callbacks run on the **mailbox** (task thread)
   
   **Metrics (representative):**
   - Counters: `numRecordsOut`, `numRecordsOutErrors`, `retries`
   - Histogram hooks for latencies
   
   ### Feature matrix
   
   | Capability | POJO (DataStax Mapper) | Row (CQL) | Tuple (CQL) | Scala 
Product (CQL) | RowData (CQL) |
   |---|:---:|:---:|:---:|:---:|:---:|
   | INSERT | ✅ (mapper `.save()` upsert) | ✅ | ✅ | ✅ | ✅ |
   | UPDATE | ❌ | ✅ | ✅ | ✅ | ✅ |
   | LWT (`IF`, `IF EXISTS/NOT EXISTS`) | ❌ | ✅ | ✅ | ✅ | ✅ |
   | Dynamic CQL (per-record table/key routing, clauses, values) | ❌ | ✅ | ✅ | 
✅ | ✅ |
   | Ignore nulls | ✅ (`saveNullFields=false`) | ✅ (UNSET/customizer) | ✅ 
(UNSET/customizer) | ✅ (UNSET/customizer) | ✅ (UNSET/customizer) |
   | TTL / TIMESTAMP / Consistency | ✅ (mapper options) | ✅ | ✅ | ✅ | ✅ |
   | Backpressure / Retries / Flush / Close / Metrics | ✅ | ✅ | ✅ | ✅ | ✅ |
   
   ### How users use it
   
   **POJO  TYPE**
   
   ```
   PojoSinkConfig<MyPojo> cfg =
       PojoSinkConfig.forPojo(MyPojo.class)
           .withSaveNullFields(false)      // UNSET nulls to avoid tombstones
           .withTtlSeconds(3600)           // optional TTL
           .withConsistencyLevel(ConsistencyLevel.QUORUM); // optional
   
   CassandraSink.<MyPojo>builder()
       .setClusterBuilder(clusterBuilder)
       .setRequestConfiguration(
           RequestConfiguration.builder()
               .setMaxConcurrentRequests(8)
               .setMaxRetries(3)
               .build())
       .setConfig(cfg)
       .build();
   ```
   
   
   **CQL TYPES (STATIC QUERIES)**
   
   ```
   CqlSinkConfig<Row> cfg =
       CqlSinkConfig.<Row>forRow()
           .withQuery("UPDATE ks.tbl USING TTL 600 SET a=?, b=? WHERE pk=? IF 
a=?");
   
   CassandraSink.<Row>builder()
       .setClusterBuilder(clusterBuilder)
       
.setRequestConfiguration(RequestConfiguration.builder().setMaxConcurrentRequests(8).build())
       .setConfig(cfg)
       .build();
   
   ```
   
   
   **CQL TYPES (DYNAMIC)**
   
   ```
   CqlSinkConfig<Row> cfg =
       CqlSinkConfig.<Row>forRow()
           .dynamic()
           .withTableResolver(/* per-record ks/table */)
           .withClauseResolver(/* USING TTL/TIMESTAMP, IF/LWT per record */)
           .withStatementCustomizer(/* consistency/timeout, ignore-null via 
UNSET, etc. */);
   
   CassandraSink.<Row>builder()
       .setClusterBuilder(clusterBuilder)
       
.setRequestConfiguration(RequestConfiguration.builder().setMaxConcurrentRequests(8).build())
       .setConfig(cfg)
       .build();
   ```
   
   ### What’s included (by area)
   
   **Sink entrypoints**
   - `CassandraSink`
   - `CassandraSinkBuilder`
   - `CassandraSinkWriter`
   
   **Writer runtime & failure handling**
   - `RecordWriterFactory`
   - `CassandraFailureHandler`
   - `CassandraFatalExceptionClassifier`
   - `MaxRetriesExceededException`
   
   **Planning (assemblers & strategy)**
   - `PlannerAssembler`, `StaticPlannerAssembler`, `DynamicPlannerAssembler`
   - `PlannerStrategy`, `ResolutionMode`
   - `InsertStrategy` (`StaticInsertStrategy`)
   - `UpdateStrategy` (`StaticUpdateStrategy`)
   - `StatementPlanner`, `StatementPlannerFactory`, `PreparedStatementCache`
   
   **Resolvers & helpers**
   - **Tables:** `TableResolver`, `FixedTableResolver`, `TableRef`
   - **Columns/values:** `ColumnValueResolver`, `FixedColumnValueResolver`, 
`RowDataFieldsResolver`, `ResolvedWrite`
   - **Clauses:** `CqlClauseResolver`, `NoOpClauseResolver`, `ClauseBindings`
   - **Customization:** `StatementCustomizer`, `NoOpCustomizer`, 
`NullUnsettingCustomizer`
   - **Utilities:** `QueryParser`, `CqlStatementHelper`
   
   **Configs**
   - `RequestConfiguration` (concurrency, retries, timeout)
   - `CassandraPluggableConfig`, `CassandraSinkConfig`
   - `CqlSinkConfig`, `PojoSinkConfig`, `RowDataSinkConfig`
   - `RecordFormatType`, `SinkPluggable`
   
   
   ### Testing in this PR
   
   **A. Unit tests (no Cassandra; fast & deterministic)**
   
   - **Planning / Assemblers**
     - `DynamicPlannerAssemblerTest`, `StaticPlannerAssemblerTest` — assemble 
`StatementPlanner` for dynamic/static paths; verify routing, clause resolution, 
strategy selection.
   - **Configs**
     - `RequestConfigurationTest` — concurrency/retry/timeout and builder 
validation.
     - `CqlSinkConfigTest`, `PojoSinkConfigTest`, `RowDataSinkConfigTest` — 
static vs dynamic toggles, null-unsetting, customizers, format selection.
   - **Strategies & Parsing**
     - `InsertStrategyTest`, `StaticInsertStrategyTest`, `UpdateStrategyTest`, 
`StaticUpdateStrategyTest` — CQL generation for INSERT/UPDATE, TTL/TIMESTAMP/IF.
     - `QueryParserTest` — user CQL parsing, placeholder counts, safety checks.
   - **Resolvers & Model**
     - `ColumnValueResolverTest`, `FixedColumnValueResolverTest` — value 
mapping, null semantics.
     - `RowDataFieldsResolverTest`, `ResolvedWriteTest` — field extraction, 
composite PK binding.
     - `TableRefTest` — keyspace/table/quoted identifiers.
   - **Helpers & Factories**
     - `PreparedStatementCacheTest` — reuse/eviction behavior.
     - `CqlStatementHelperTest` — binding helpers & UNSET handling.
     - `StatementPlannerFactoryTest`, `StatementPlannerTest` — end-to-end plan 
→ bind (incl. clause/customizer/multi-keyspace).
     - `RecordWriterFactoryTest` — picks correct low-level writer per format.
     - `CassandraSinkBuilderTest` — public builder validation and option wiring.
   - **Writer behavior (format-agnostic)**
     - `CassandraSinkWriterTest` — permits/backpressure, permit release 
(success/error), retries & max-retries, fatal short-circuit, 
`flush()`/`close()` semantics, mailbox callback threading, metrics, no 
deadlocks.
   
   **B. Integration tests (real Cassandra; smoke coverage)**
   
   - `CassandraCqlSinkITCase` — static CQL path: INSERT/UPDATE variants, USING 
TTL/TIMESTAMP/IF, quoted identifiers, composite keys, ignore-null (UNSET), 
fatal validation messages. (Dynamic Table & Column Resolvers.)
   - `CassandraPojoSinkITCase` — POJO path: mapper options (TTL, consistency, 
saveNullFields), upsert semantics, null-PK failure, missing table, TTL expiry.
   - `CassandraSinkWriterITCase` — end-to-end writer loop with mailbox & 
permits: backpressure at `maxConcurrentRequests=1`, `flush()` waits, `close()` 
idempotent, fatal error (bad table) surfaces once.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to