[
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628522#comment-15628522
]
ASF GitHub Bot commented on FLINK-4939:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2707#discussion_r86106025
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
---
@@ -118,16 +117,32 @@ public void close() throws Exception {
}
@Override
- public void commitCheckpoint(long checkpointID) {
- session.execute(updateStatement.bind(checkpointID));
- this.lastCommittedCheckpointID = checkpointID;
+ public void commitCheckpoint(int subtaskIdx, long checkpointId) {
+ String statement = String.format(
+ "UPDATE %s.%s set checkpoint_id=%d where sink_id='%s'
and sub_id=%d;",
+ keySpace, table, checkpointId, operatorId, subtaskIdx);
+
+ session.execute(statement);
}
@Override
- public boolean isCheckpointCommitted(long checkpointID) {
- if (this.lastCommittedCheckpointID == -1) {
- this.lastCommittedCheckpointID =
session.execute(selectStatement.bind()).one().getLong("checkpoint_id");
+ public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId)
{
+
+ // Pending checkpointed buffers are committed in ascending
order of their
+ // checkpoint id. This way we can tell if a checkpointed buffer
was committed
+ // just by asking the third party storage system for the last
checkpointed
+ // checkpoint id committed by the specific subtask.
+
+ Long lastCommittedCheckpoint =
lastCommittedCheckpoints.get(subtaskIdx);
+ if (lastCommittedCheckpoint == null || lastCommittedCheckpoint
== -1) {
--- End diff --
we don't need to put -1 into the Map in case no checkpoint was committed;
the map returning null serves the same purpose.
> GenericWriteAheadSink: Decouple the creating from the committing subtask for
> a pending checkpoint
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also
> include that subtask index of the subtask that wrote
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)