Hi Etienne,
Thank you for talking out the time to review the FLIP:

Please find the responses inline below:

>
> - config: usually in Flink we use factories to build from configuration
> rather than a visitor pattern which is quite associated with tree/graph
> or any collection to visit.
>

Thanks for the note — to clarify, the visitor pattern is used purely
internally for runtime wiring and is not a substitute for Flink’s
factory-based configuration system.
>From the user’s perspective, the sink is configured in the standard Flink
way:
  - For SQL/Table API users, we provide a DynamicTableSinkFactory that
reads options like keyspace, table, ttl, etc., and builds the appropriate
internal config (RowDataSinkConfig, PojoSinkConfig, etc.).
  - For DataStream API users, we expose a typed builder interface to create
sinks for POJO, Row, Tuple, or Scala Product types.
Internally, these config classes encapsulate the input format type, and we
use a CassandraWriterVisitor to map each config to the corresponding
writer:
PojoSinkConfig → PojoRecordWriter (DataStax Mapper-based)
RowDataSinkConfig → RowDataRecordWriter (uses logical type extractors)
RowSinkConfig / TupleSinkConfig → CQLRecordWriter (explicit CQL binding)
These input types have different write paths, so the visitor pattern helps
cleanly route config to the correct writer without duplicating logic or
relying on instanceof/enum checks. This pattern is fully internal — users
only see a consistent factory-based or builder-based API, and never need to
interact with the visitor or writer types directly.
Happy to share code if required.


>
> - parallel writes : what is the instanciation model of RecordWriter ?
> one per subtask writing incoming record one by one?
>

Yes, the instantiation model is one CassandraRecordWriter per subtask. Each
Flink subtask creates and owns a writer instance during CassandraSinkWriter
construction.
Records are written individually and asynchronously — each call to
write(input) immediately prepares and submits a Cassandra async request via
the driver. There is no buffering or batching in Phase 1.


>
> - are semaphore permits (max number of inflight requests) ?
>

Correct. The semaphore enforces a cap on concurrent async requests per
subtask (defined via SyncRequestConfiguration.maxConnections). If permits
are exhausted, we yield to the Flink mailbox until permits are released or
a timeout is reached.

>
> - does the cassandra client that writes include support for custom
> retry/backoff policies (e.g. exponential delay etc...)
>
>
While the DataStax Java driver (v3.11.2)
<https://docs.datastax.com/en/developer/java-driver/3.11/manual/retries/index.html>
does support pluggable RetryPolicy implementations, including custom logic,
these policies are limited in important ways:
  - No built-in support for backoff delays — retries happen immediately,
unless the user manually blocks with Thread.sleep(...), which might be
unsafe in high-throughput environments.
  - No per-record visibility — the driver sees retries in terms of
Cassandra statements, not Flink stream records.
  - No integration with Flink’s checkpointing or threading model — the
driver retries happen in its own threads, outside of Flink’s control.

To address these limitations, we implemented retries directly within the
Flink SinkWriter. This gives us:
Full control over retry timing, including the ability to add exponential
backoff via MailboxExecutor.schedule(...), without blocking any threads.
Stream-level visibility and metrics, such as retries per record, custom
failure classification, and retry counters.


> - there is no buffering of records, so when flink framework calls write,
> an async request is done on the fly with each request, right ?
>
>
Yes, that's correct. In Phase 1, there is no internal buffering — each call
to write(input, context) immediately prepares a Cassandra statement and
submits it asynchronously via the driver.


> *phase2 design:*
>
> - nice to see that the cassandra writes will now be per-partition,
> leveraging the high write throughput that Cassandra can do.!
>

:)

>
> - need to migrate all the existing cassandra sources and sink to flink
> 2.1 before starting ?
>

Yes, to implement the Phase 2 batching sink using Flink's AsyncSinkWriter,
we will need to migrate the Cassandra sink codebase to Flink 2.1. This is a
prerequisite because FLIP-509 (introduced in 2.1) provides the necessary
abstractions such as BufferWrapper, BatchCreator, and ElementConverter for
partition-aware batching.


>
> - ElementConverter is not provided by the sink but by the user of the
> sink, as a mapping function from stream elements to RequestEntryT
>

To clarify, the ElementConverter
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181309759#FLIP171:AsyncSink-PublicInterfaces>
is not provided by the end user, but rather by the sink itself. The
CassandraAsyncSink will expose a builder-style interface where the
appropriate ElementConverter (e.g., PojoElementConverter,
RowDataElementConverter) is chosen internally based on the user’s
configuration or input type.

So while the ElementConverter is configurable inside the sink, it is not
something the user is expected to implement or supply directly.

>
> - when group size in bytes or in nb of records is exceeded, the records
> are not added to the group (batch), make sure that these records are
> part of the Collection<RequestEntryT> that is snapshoted in AsyncSinkWriter
>
>
Yes, if a record exceeds batch limits (count or size), we *don’t drop or
skip* it—we still track it in the buffer. The BatchCreator simply returns a
partial batch, and the oversized record will be part of the next batch or
flushed alone. AsyncSinkWriter.snapshotState() includes all remaining
buffered entries, including any that were not yet flushed.

Thanks,
Poorvank

Reply via email to