[
https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142979#comment-16142979
]
ASF GitHub Bot commented on FLINK-4500:
---------------------------------------
GitHub user mcfongtw opened a pull request:
https://github.com/apache/flink/pull/4605
[FLINK-4500] [C* Connector] CassandraSinkBase implements
CheckpointedFunction
## What is the purpose of the change
Have CassandraSinkBase to implement CheckpointedFunction so that all
in-flight mutation message could be sent to C* sink before a checkpoint
performs. As a result, the checkpoint would be complete.
## Brief change log
* Implement CheckpointedFunction to (optionally) wait on all pending
records being flushed to the C* sink before checkpoint performs (or closing
connection).
* Add debugging message in CassandraSinkBase.
* Add unit tests for simple / multi-threaded message dispatching for
successful / failed scenarios
* Add unit tests for failure handling logics on errors thrown at different
stages.
* Add unit tests for flushing pending records when checkpoint performs.
* Provide a Immediate / Delayed type of ResultSetFuture for testing
purposes.
* Add CassandraBaseTest in suppression list to use guava imports
* In log4j-test settings, change root log level to INFO and enable ALL
level against some test classes.
## Verifying this change
This change is already covered by existing tests, such as
*CassandraBaseTest*.
This change added tests and can be verified as follows:
* Add unit tests for simple / multi-threaded message dispatching for
successful / failed scenarios
* Add unit tests for failure handling logics on errors thrown at different
stages.
* Add unit tests for flushing pending records when checkpoint performs.
* Provide a Immediate / Delayed type of ResultSetFuture for testing
purposes.
* Add CassandraBaseTest in suppression list to use guava imports
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes /
**no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no** (maybe) )
- If yes, how is the feature documented? (not applicable / docs /
JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mcfongtw/flink FLINK-4500
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/4605.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4605
----
commit caefe390bf2aaa22d996cc24a31a3ba76241fb23
Author: Michael Fong <[email protected]>
Date: 2017-08-14T12:57:06Z
[FLINK-4500] CassandraSinkBase implements CheckpointedFunction
* Implement CheckpointedFunction to (optionally) wait on all pending
records being flushed to the C* sink before taking a snapshot (or closing
connection).
* Add debugging message in CassandraSinkBase.
* Add unit tests for simple / multi-threaded message dispatching for
successful / failed scenarios
* Add unit tests for failure handling logics on errors thrown at different
stages.
* Add unit tests for flushing pending records when checkpoint performs.
* Provide a Immediate / Delayed type of ResultSetFuture for testing
purposes.
* Add CassandraBaseTest in suppression list to use guava imports
* In log4j-test settings, change root log level to INFO and enable ALL
level against some test classes.
----
> Cassandra sink can lose messages
> --------------------------------
>
> Key: FLINK-4500
> URL: https://issues.apache.org/jira/browse/FLINK-4500
> Project: Flink
> Issue Type: Bug
> Components: Cassandra Connector
> Affects Versions: 1.1.0
> Reporter: Elias Levy
> Assignee: Michael Fong
>
> The problem is the same as I pointed out with the Kafka producer sink
> (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send()
> both send data asynchronously to Cassandra and record whether an error occurs
> via a future callback. But CassandraSinkBase does not implement
> Checkpointed, so it can't stop checkpoint from happening even though the are
> Cassandra queries in flight from the checkpoint that may fail. If they do
> fail, they would subsequently not be replayed when the job recovered, and
> would thus be lost.
> In addition,
> CassandraSinkBase's close should check whether there is a pending exception
> and throw it, rather than silently close. It should also wait for any
> pending async queries to complete and check their status before closing.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)