## What is the purpose of the change

This pull requests rewrites `CassandraSinkBase` to use a `Phaser` and 
`Semaphore` to provide proper synchronization to support 
`maxConcurrentRequests` as a new configuration. This improves the reliability 
of the Cassandra Connector as it can currently overwhelm a weak Cassandra 
cluster if the upstream source has very high throughput.

## Brief change log

  - Rewrote `CassandraSinkBase` to use a `Phaser` and `Semaphore`.
  - Expose `public void setMaxConcurrentRequests(int maxConcurrentRequests, 
long timeout, TimeUnit unit)` on `CassandraSinkBase`.
  - Modify `CassandraSink` with the new configuration. It currently does not 
support the WAL.
  - Updated the documentation about the new configuration.

## Verifying this change

This change is already covered by existing tests, such as 
`CassandraSinkBaseTest` and `CassandraConnectorITCase`.

This change added tests and can be verified as follows:

  - Added tests for acquiring permits from the `Semaphore` and releasing 
permits from the `Semaphore` when a write succeeds or fails.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs / JavaDocs


[ Full content available at: https://github.com/apache/flink/pull/6782 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to