[ 
https://issues.apache.org/jira/browse/FLINK-4502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-4502:
----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Cassandra connector documentation has misleading consistency guarantees
> -----------------------------------------------------------------------
>
>                 Key: FLINK-4502
>                 URL: https://issues.apache.org/jira/browse/FLINK-4502
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Cassandra, Documentation
>    Affects Versions: 1.1.0
>            Reporter: Elias Levy
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> The Cassandra connector documentation states that  "enableWriteAheadLog() is 
> an optional method, that allows exactly-once processing for non-deterministic 
> algorithms."  This claim appears to be false.
> From what I gather, the write ahead log feature of the connector works as 
> follows:
> - The sink is replaced with a stateful operator that writes incoming messages 
> to the state backend based on checkpoint they belong in.
> - When the operator is notified that a Flink checkpoint has been completed 
> it, for each set of checkpoints older than and including the committed one:
>   * reads its messages from the state backend
>   * writes them to Cassandra
>   * records that it has committed them to Cassandra for the specific 
> checkpoint and operator instance
>    * and erases them from the state backend.
> This process attempts to avoid resubmitting queries to Cassandra that would 
> otherwise occur when recovering a job from a checkpoint and having messages 
> replayed.
> Alas, this does not guarantee exactly once semantics at the sink.  The writes 
> to Cassandra that occur when the operator is notified that checkpoint is 
> completed are not atomic and they are potentially non-idempotent.  If the job 
> dies while writing to Cassandra or before committing the checkpoint via 
> committer, queries will be replayed when the job recovers.  Thus the 
> documentation appear to be incorrect in stating this provides exactly-once 
> semantics.
> There also seems to be an issue in GenericWriteAheadSink's 
> notifyOfCompletedCheckpoint which may result in incorrect output.  If 
> sendValues returns false because a write failed, instead of bailing, it 
> simply moves on to the next checkpoint to commit if there is one, keeping the 
> previous one around to try again later.  But that can result in newer data 
> being overwritten with older data when the previous checkpoint is retried.  
> Although given that CassandraCommitter implements isCheckpointCommitted as 
> checkpointID <= this.lastCommittedCheckpointID, it actually means that when 
> it goes back to try the uncommitted older checkpoint it will consider it 
> committed, even though some of its data may not have been written out, and 
> the data will be discarded.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to