Hi Poorvank,
Thanks, comments inline.
Le 04/06/2025 à 18:24, Poorvank Bhatia a écrit :
Hi Etienne,
Thank you for talking out the time to review the FLIP:
Please find the responses inline below:
- config: usually in Flink we use factories to build from configuration
rather than a visitor pattern which is quite associated with tree/graph
or any collection to visit.
Thanks for the note — to clarify, the visitor pattern is used purely
internally for runtime wiring and is not a substitute for Flink’s
factory-based configuration system.
From the user’s perspective, the sink is configured in the standard Flink
way:
- For SQL/Table API users, we provide a DynamicTableSinkFactory that
reads options like keyspace, table, ttl, etc., and builds the appropriate
internal config (RowDataSinkConfig, PojoSinkConfig, etc.).
- For DataStream API users, we expose a typed builder interface to create
sinks for POJO, Row, Tuple, or Scala Product types.
Internally, these config classes encapsulate the input format type, and we
use a CassandraWriterVisitor to map each config to the corresponding
writer:
PojoSinkConfig → PojoRecordWriter (DataStax Mapper-based)
RowDataSinkConfig → RowDataRecordWriter (uses logical type extractors)
RowSinkConfig / TupleSinkConfig → CQLRecordWriter (explicit CQL binding)
These input types have different write paths, so the visitor pattern helps
cleanly route config to the correct writer without duplicating logic or
relying on instanceof/enum checks. This pattern is fully internal — users
only see a consistent factory-based or builder-based API, and never need to
interact with the visitor or writer types directly.
Happy to share code if required.
makes sense. LGTM considering the user facing API seems coherent. Maybe
add some details to the FLIP about the user configuration of the sink
(not an exhaustive list of parameters but the general configuration
system per input type as explained above)
- parallel writes : what is the instanciation model of RecordWriter ?
one per subtask writing incoming record one by one?
Yes, the instantiation model is one CassandraRecordWriter per subtask. Each
Flink subtask creates and owns a writer instance during CassandraSinkWriter
construction.
Records are written individually and asynchronously — each call to
write(input) immediately prepares and submits a Cassandra async request via
the driver. There is no buffering or batching in Phase 1.
thanks for confirmation. Please clarify this in the FLIP
- are semaphore permits (max number of inflight requests) ?
Correct. The semaphore enforces a cap on concurrent async requests per
subtask (defined via SyncRequestConfiguration.maxConnections). If permits
are exhausted, we yield to the Flink mailbox until permits are released or
a timeout is reached.
Sorry I missed part of my sentence, I wanted to know if it was
configurable. But you answered that. Though, I find the name misleading:
they are requests emitted on the fly (opposite to batched requests) but
asynchronous still so I would not call the parameter SyncRequest*.
Please add conf parameter to the FLIP
- does the cassandra client that writes include support for custom
retry/backoff policies (e.g. exponential delay etc...)
While the DataStax Java driver (v3.11.2)
<https://docs.datastax.com/en/developer/java-driver/3.11/manual/retries/index.html>
does support pluggable RetryPolicy implementations, including custom logic,
these policies are limited in important ways:
- No built-in support for backoff delays — retries happen immediately,
unless the user manually blocks with Thread.sleep(...), which might be
unsafe in high-throughput environments.
agree
- No per-record visibility — the driver sees retries in terms of
Cassandra statements, not Flink stream records.
sure
- No integration with Flink’s checkpointing or threading model — the
driver retries happen in its own threads, outside of Flink’s control.
To address these limitations, we implemented retries directly within the
Flink SinkWriter. This gives us:
Full control over retry timing, including the ability to add exponential
backoff via MailboxExecutor.schedule(...), without blocking any threads.
Stream-level visibility and metrics, such as retries per record, custom
failure classification, and retry counters.
it looks good, I just wanted to know if the user had enhanced retry
capabilities. Maybe you could add such detail to the FLIP.
I like the general aim to avoid forking threads and rely on Flink's
threads (other released connectors do that also) provided that we don't
occupy the threadpools too much and risk to reduce tasks parallelism
(sub-optimal performances) or even in the worst case end up with no
available threads to the scheduler.
- there is no buffering of records, so when flink framework calls write,
an async request is done on the fly with each request, right ?
Yes, that's correct. In Phase 1, there is no internal buffering — each call
to write(input, context) immediately prepares a Cassandra statement and
submits it asynchronously via the driver.
thanks for confirmation
*phase2 design:*
- nice to see that the cassandra writes will now be per-partition,
leveraging the high write throughput that Cassandra can do.!
:)
- need to migrate all the existing cassandra sources and sink to flink
2.1 before starting ?
Yes, to implement the Phase 2 batching sink using Flink's AsyncSinkWriter,
we will need to migrate the Cassandra sink codebase to Flink 2.1. This is a
prerequisite because FLIP-509 (introduced in 2.1) provides the necessary
abstractions such as BufferWrapper, BatchCreator, and ElementConverter for
partition-aware batching.
This represents some effort, but this is good to migrate the old stuff
or start deprecating. For the FLIP-27 source, it is quite recent so
the migration should be quite easy but for the other pieces you might
end up having parts that rely on already dropped Flink 2.1 parts as
connectors release cycle is now separated from the Flink core release cycle.
- ElementConverter is not provided by the sink but by the user of the
sink, as a mapping function from stream elements to RequestEntryT
To clarify, the ElementConverter
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181309759#FLIP171:AsyncSink-PublicInterfaces>
is not provided by the end user, but rather by the sink itself. The
CassandraAsyncSink will expose a builder-style interface where the
appropriate ElementConverter (e.g., PojoElementConverter,
RowDataElementConverter) is chosen internally based on the user’s
configuration or input type.
So while the ElementConverter is configurable inside the sink, it is not
something the user is expected to implement or supply directly.
This is clearer, thanks. I have not checked what the recent released
connectors do for the ElementConverter. I was just relying on what is
said in the FLIP-171. Anyway, as usual, simplifying the user
configuration API by avoiding having to provide external objects is a
good thing.
- when group size in bytes or in nb of records is exceeded, the records
are not added to the group (batch), make sure that these records are
part of the Collection<RequestEntryT> that is snapshoted in AsyncSinkWriter
Yes, if a record exceeds batch limits (count or size), we *don’t drop or
skip* it—we still track it in the buffer. The BatchCreator simply returns a
partial batch, and the oversized record will be part of the next batch or
flushed alone. AsyncSinkWriter.snapshotState() includes all remaining
buffered entries, including any that were not yet flushed.
perfect, thanks for confirmation.
Thanks,
Poorvank
Best,
Etienne