Hi Etienne, Thank you for talking out the time to review the FLIP: Please find the responses inline below:
> > - config: usually in Flink we use factories to build from configuration > rather than a visitor pattern which is quite associated with tree/graph > or any collection to visit. > Thanks for the note — to clarify, the visitor pattern is used purely internally for runtime wiring and is not a substitute for Flink’s factory-based configuration system. >From the user’s perspective, the sink is configured in the standard Flink way: - For SQL/Table API users, we provide a DynamicTableSinkFactory that reads options like keyspace, table, ttl, etc., and builds the appropriate internal config (RowDataSinkConfig, PojoSinkConfig, etc.). - For DataStream API users, we expose a typed builder interface to create sinks for POJO, Row, Tuple, or Scala Product types. Internally, these config classes encapsulate the input format type, and we use a CassandraWriterVisitor to map each config to the corresponding writer: PojoSinkConfig → PojoRecordWriter (DataStax Mapper-based) RowDataSinkConfig → RowDataRecordWriter (uses logical type extractors) RowSinkConfig / TupleSinkConfig → CQLRecordWriter (explicit CQL binding) These input types have different write paths, so the visitor pattern helps cleanly route config to the correct writer without duplicating logic or relying on instanceof/enum checks. This pattern is fully internal — users only see a consistent factory-based or builder-based API, and never need to interact with the visitor or writer types directly. Happy to share code if required. > > - parallel writes : what is the instanciation model of RecordWriter ? > one per subtask writing incoming record one by one? > Yes, the instantiation model is one CassandraRecordWriter per subtask. Each Flink subtask creates and owns a writer instance during CassandraSinkWriter construction. Records are written individually and asynchronously — each call to write(input) immediately prepares and submits a Cassandra async request via the driver. There is no buffering or batching in Phase 1. > > - are semaphore permits (max number of inflight requests) ? > Correct. The semaphore enforces a cap on concurrent async requests per subtask (defined via SyncRequestConfiguration.maxConnections). If permits are exhausted, we yield to the Flink mailbox until permits are released or a timeout is reached. > > - does the cassandra client that writes include support for custom > retry/backoff policies (e.g. exponential delay etc...) > > While the DataStax Java driver (v3.11.2) <https://docs.datastax.com/en/developer/java-driver/3.11/manual/retries/index.html> does support pluggable RetryPolicy implementations, including custom logic, these policies are limited in important ways: - No built-in support for backoff delays — retries happen immediately, unless the user manually blocks with Thread.sleep(...), which might be unsafe in high-throughput environments. - No per-record visibility — the driver sees retries in terms of Cassandra statements, not Flink stream records. - No integration with Flink’s checkpointing or threading model — the driver retries happen in its own threads, outside of Flink’s control. To address these limitations, we implemented retries directly within the Flink SinkWriter. This gives us: Full control over retry timing, including the ability to add exponential backoff via MailboxExecutor.schedule(...), without blocking any threads. Stream-level visibility and metrics, such as retries per record, custom failure classification, and retry counters. > - there is no buffering of records, so when flink framework calls write, > an async request is done on the fly with each request, right ? > > Yes, that's correct. In Phase 1, there is no internal buffering — each call to write(input, context) immediately prepares a Cassandra statement and submits it asynchronously via the driver. > *phase2 design:* > > - nice to see that the cassandra writes will now be per-partition, > leveraging the high write throughput that Cassandra can do.! > :) > > - need to migrate all the existing cassandra sources and sink to flink > 2.1 before starting ? > Yes, to implement the Phase 2 batching sink using Flink's AsyncSinkWriter, we will need to migrate the Cassandra sink codebase to Flink 2.1. This is a prerequisite because FLIP-509 (introduced in 2.1) provides the necessary abstractions such as BufferWrapper, BatchCreator, and ElementConverter for partition-aware batching. > > - ElementConverter is not provided by the sink but by the user of the > sink, as a mapping function from stream elements to RequestEntryT > To clarify, the ElementConverter <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181309759#FLIP171:AsyncSink-PublicInterfaces> is not provided by the end user, but rather by the sink itself. The CassandraAsyncSink will expose a builder-style interface where the appropriate ElementConverter (e.g., PojoElementConverter, RowDataElementConverter) is chosen internally based on the user’s configuration or input type. So while the ElementConverter is configurable inside the sink, it is not something the user is expected to implement or supply directly. > > - when group size in bytes or in nb of records is exceeded, the records > are not added to the group (batch), make sure that these records are > part of the Collection<RequestEntryT> that is snapshoted in AsyncSinkWriter > > Yes, if a record exceeds batch limits (count or size), we *don’t drop or skip* it—we still track it in the buffer. The BatchCreator simply returns a partial batch, and the oversized record will be part of the next batch or flushed alone. AsyncSinkWriter.snapshotState() includes all remaining buffered entries, including any that were not yet flushed. Thanks, Poorvank