## What is the purpose of the change This pull requests rewrites `CassandraSinkBase` to use a `Phaser` and `Semaphore` to provide proper synchronization to support `maxConcurrentRequests` as a new configuration. This improves the reliability of the Cassandra Connector as it can currently overwhelm a weak Cassandra cluster if the upstream source has very high throughput.
## Brief change log - Rewrote `CassandraSinkBase` to use a `Phaser` and `Semaphore`. - Expose `public void setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit)` on `CassandraSinkBase`. - Modify `CassandraSink` with the new configuration. It currently does not support the WAL. - Updated the documentation about the new configuration. ## Verifying this change This change is already covered by existing tests, such as `CassandraSinkBaseTest` and `CassandraConnectorITCase`. This change added tests and can be verified as follows: - Added tests for acquiring permits from the `Semaphore` and releasing permits from the `Semaphore` when a write succeeds or fails. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs / JavaDocs [ Full content available at: https://github.com/apache/flink/pull/6782 ] This message was relayed via gitbox.apache.org for [email protected]
