[
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628524#comment-15628524
]
ASF GitHub Bot commented on FLINK-4939:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2707#discussion_r86106408
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
---
@@ -118,16 +117,32 @@ public void close() throws Exception {
}
@Override
- public void commitCheckpoint(long checkpointID) {
- session.execute(updateStatement.bind(checkpointID));
- this.lastCommittedCheckpointID = checkpointID;
+ public void commitCheckpoint(int subtaskIdx, long checkpointId) {
+ String statement = String.format(
+ "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s'
and sub_id=%d;",
+ keySpace, table, checkpointId, operatorId, subtaskIdx);
+
+ session.execute(statement);
--- End diff --
You forgot to update the `lastCommittedCheckpointID` here. It will now only
be updated once in `isCheckpointCommitted`.
As a follow up the test for this behavior should be extended to cover more
than 1 commit.
> GenericWriteAheadSink: Decouple the creating from the committing subtask for
> a pending checkpoint
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also
> include that subtask index of the subtask that wrote
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)