Poorvankbhatia opened a new pull request, #38: URL: https://github.com/apache/flink-connector-cassandra/pull/38
### Cassandra Sink V2 This PR is a detailed Phase-1 implementation of [FLIP-533](https://cwiki.apache.org/confluence/display/FLINK/FLIP-533%3A+Cassandra+Sink+V2). It adds a new Cassandra sink for Flink that cleanly separates planning (CQL generation/binding) from writing POJO writing via DataStax Mapper; Row/RowData/Tuple & Scala.Product via static & dynamic CQL - POJO: implemented with DataStax Mapper (upsert semantics), mapper options (TTL, consistency, saveNullFields) - Row / RowData / Tuple / Scala.Product: support static CQL (INSERT/UPDATE) and dynamic CQL (per-record routing/clauses/values/customizers) ### What this unlocks - **Static CQL (`Row`, `RowData`,`Tuple`,`Scala.Product`)** - Write your own `UPDATE ... SET ... WHERE ...` (incl. `IF` LWT, `USING TTL/TIMESTAMP`). - **Ignore nulls via `UNSET`** to avoid tombstones. - Per-statement **consistency/timeout** via customizer. - **Dynamic CQL (`Row`, `RowData`)** - **Per-record routing**: resolve keyspace/table dynamically (multi-tenant, time-partitioned tables). - **Per-record values & partial updates**: choose which columns to `SET`, **skip nulls**. - **Per-record clauses**: `USING` and `IF` generated from the record. - **Per-record statement customization**: consistency and timeout decisions at write time. - **Prepared-statement caching** for throughput. ### Capabilities (what the sink supports) **Static CQL (user-provided):** - INSERT / UPDATE / LWT (IF) - USING TTL / TIMESTAMP - Quoted identifiers & composite keys - Optional ignore-null via **UNSET** (avoid tombstones) - Statement customization (consistency, timeout) **Dynamic CQL (planned per record):** - Table routing (incl. multi-keyspace) via `TableResolver` - Composite-key updates and per-record value binding via `ColumnValueResolver` - Per-record USING / IF via `CqlClauseResolver` - Per-record `StatementCustomizer` (consistency/timeout) - Prepared statement reuse via `PreparedStatementCache` **Input formats:** - POJO (DataStax Mapper; static INSERT upsert with mapper options) - Row (CQL) - RowData (CQL) **Writer semantics:** - Bounded concurrency (permits) + backpressure - Retries with fatal vs. retriable classification - `flush()` waits for all in-flight operations - Success/failure callbacks run on the **mailbox** (task thread) **Metrics (representative):** - Counters: `numRecordsOut`, `numRecordsOutErrors`, `retries` - Histogram hooks for latencies ### Feature matrix | Capability | POJO (DataStax Mapper) | Row (CQL) | Tuple (CQL) | Scala Product (CQL) | RowData (CQL) | |---|:---:|:---:|:---:|:---:|:---:| | INSERT | ✅ (mapper `.save()` upsert) | ✅ | ✅ | ✅ | ✅ | | UPDATE | ❌ | ✅ | ✅ | ✅ | ✅ | | LWT (`IF`, `IF EXISTS/NOT EXISTS`) | ❌ | ✅ | ✅ | ✅ | ✅ | | Dynamic CQL (per-record table/key routing, clauses, values) | ❌ | ✅ | ✅ | ✅ | ✅ | | Ignore nulls | ✅ (`saveNullFields=false`) | ✅ (UNSET/customizer) | ✅ (UNSET/customizer) | ✅ (UNSET/customizer) | ✅ (UNSET/customizer) | | TTL / TIMESTAMP / Consistency | ✅ (mapper options) | ✅ | ✅ | ✅ | ✅ | | Backpressure / Retries / Flush / Close / Metrics | ✅ | ✅ | ✅ | ✅ | ✅ | ### How users use it **POJO TYPE** ``` PojoSinkConfig<MyPojo> cfg = PojoSinkConfig.forPojo(MyPojo.class) .withSaveNullFields(false) // UNSET nulls to avoid tombstones .withTtlSeconds(3600) // optional TTL .withConsistencyLevel(ConsistencyLevel.QUORUM); // optional CassandraSink.<MyPojo>builder() .setClusterBuilder(clusterBuilder) .setRequestConfiguration( RequestConfiguration.builder() .setMaxConcurrentRequests(8) .setMaxRetries(3) .build()) .setConfig(cfg) .build(); ``` **CQL TYPES (STATIC QUERIES)** ``` CqlSinkConfig<Row> cfg = CqlSinkConfig.<Row>forRow() .withQuery("UPDATE ks.tbl USING TTL 600 SET a=?, b=? WHERE pk=? IF a=?"); CassandraSink.<Row>builder() .setClusterBuilder(clusterBuilder) .setRequestConfiguration(RequestConfiguration.builder().setMaxConcurrentRequests(8).build()) .setConfig(cfg) .build(); ``` **CQL TYPES (DYNAMIC)** ``` CqlSinkConfig<Row> cfg = CqlSinkConfig.<Row>forRow() .dynamic() .withTableResolver(/* per-record ks/table */) .withClauseResolver(/* USING TTL/TIMESTAMP, IF/LWT per record */) .withStatementCustomizer(/* consistency/timeout, ignore-null via UNSET, etc. */); CassandraSink.<Row>builder() .setClusterBuilder(clusterBuilder) .setRequestConfiguration(RequestConfiguration.builder().setMaxConcurrentRequests(8).build()) .setConfig(cfg) .build(); ``` ### What’s included (by area) **Sink entrypoints** - `CassandraSink` - `CassandraSinkBuilder` - `CassandraSinkWriter` **Writer runtime & failure handling** - `RecordWriterFactory` - `CassandraFailureHandler` - `CassandraFatalExceptionClassifier` - `MaxRetriesExceededException` **Planning (assemblers & strategy)** - `PlannerAssembler`, `StaticPlannerAssembler`, `DynamicPlannerAssembler` - `PlannerStrategy`, `ResolutionMode` - `InsertStrategy` (`StaticInsertStrategy`) - `UpdateStrategy` (`StaticUpdateStrategy`) - `StatementPlanner`, `StatementPlannerFactory`, `PreparedStatementCache` **Resolvers & helpers** - **Tables:** `TableResolver`, `FixedTableResolver`, `TableRef` - **Columns/values:** `ColumnValueResolver`, `FixedColumnValueResolver`, `RowDataFieldsResolver`, `ResolvedWrite` - **Clauses:** `CqlClauseResolver`, `NoOpClauseResolver`, `ClauseBindings` - **Customization:** `StatementCustomizer`, `NoOpCustomizer`, `NullUnsettingCustomizer` - **Utilities:** `QueryParser`, `CqlStatementHelper` **Configs** - `RequestConfiguration` (concurrency, retries, timeout) - `CassandraPluggableConfig`, `CassandraSinkConfig` - `CqlSinkConfig`, `PojoSinkConfig`, `RowDataSinkConfig` - `RecordFormatType`, `SinkPluggable` ### Testing in this PR **A. Unit tests (no Cassandra; fast & deterministic)** - **Planning / Assemblers** - `DynamicPlannerAssemblerTest`, `StaticPlannerAssemblerTest` — assemble `StatementPlanner` for dynamic/static paths; verify routing, clause resolution, strategy selection. - **Configs** - `RequestConfigurationTest` — concurrency/retry/timeout and builder validation. - `CqlSinkConfigTest`, `PojoSinkConfigTest`, `RowDataSinkConfigTest` — static vs dynamic toggles, null-unsetting, customizers, format selection. - **Strategies & Parsing** - `InsertStrategyTest`, `StaticInsertStrategyTest`, `UpdateStrategyTest`, `StaticUpdateStrategyTest` — CQL generation for INSERT/UPDATE, TTL/TIMESTAMP/IF. - `QueryParserTest` — user CQL parsing, placeholder counts, safety checks. - **Resolvers & Model** - `ColumnValueResolverTest`, `FixedColumnValueResolverTest` — value mapping, null semantics. - `RowDataFieldsResolverTest`, `ResolvedWriteTest` — field extraction, composite PK binding. - `TableRefTest` — keyspace/table/quoted identifiers. - **Helpers & Factories** - `PreparedStatementCacheTest` — reuse/eviction behavior. - `CqlStatementHelperTest` — binding helpers & UNSET handling. - `StatementPlannerFactoryTest`, `StatementPlannerTest` — end-to-end plan → bind (incl. clause/customizer/multi-keyspace). - `RecordWriterFactoryTest` — picks correct low-level writer per format. - `CassandraSinkBuilderTest` — public builder validation and option wiring. - **Writer behavior (format-agnostic)** - `CassandraSinkWriterTest` — permits/backpressure, permit release (success/error), retries & max-retries, fatal short-circuit, `flush()`/`close()` semantics, mailbox callback threading, metrics, no deadlocks. **B. Integration tests (real Cassandra; smoke coverage)** - `CassandraCqlSinkITCase` — static CQL path: INSERT/UPDATE variants, USING TTL/TIMESTAMP/IF, quoted identifiers, composite keys, ignore-null (UNSET), fatal validation messages. (Dynamic Table & Column Resolvers.) - `CassandraPojoSinkITCase` — POJO path: mapper options (TTL, consistency, saveNullFields), upsert semantics, null-PK failure, missing table, TTL expiry. - `CassandraSinkWriterITCase` — end-to-end writer loop with mailbox & permits: backpressure at `maxConcurrentRequests=1`, `flush()` waits, `close()` idempotent, fatal error (bad table) surfaces once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
