[
https://issues.apache.org/jira/browse/FLINK-4939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628530#comment-15628530
]
ASF GitHub Bot commented on FLINK-4939:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2707#discussion_r86107282
--- Diff:
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
---
@@ -95,16 +100,10 @@ public void open() throws Exception {
}
cluster = builder.getCluster();
session = cluster.connect();
-
- updateStatement = session.prepare(String.format("UPDATE %s.%s
set checkpoint_id=? where sink_id='%s' and sub_id=%d;", keySpace, table,
operatorId, subtaskId));
- selectStatement = session.prepare(String.format("SELECT
checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", keySpace, table,
operatorId, subtaskId));
-
- session.execute(String.format("INSERT INTO %s.%s (sink_id,
sub_id, checkpoint_id) values ('%s', %d, " + -1 + ") IF NOT EXISTS;", keySpace,
table, operatorId, subtaskId));
}
@Override
public void close() throws Exception {
- this.lastCommittedCheckpointID = -1;
--- End diff --
where is the replacement for this line? (This is useful for testing)
> GenericWriteAheadSink: Decouple the creating from the committing subtask for
> a pending checkpoint
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-4939
> URL: https://issues.apache.org/jira/browse/FLINK-4939
> Project: Flink
> Issue Type: Improvement
> Components: Cassandra Connector
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> So far the GenericWriteAheadSink expected that
> the subtask that wrote a pending checkpoint to the
> state backend, will be also the one to commit it to
> the third-party storage system.
> This issue targets at removing this assumption. To do this
> the CheckpointCommitter has to be able to dynamically
> take the subtaskIdx as a parameter when asking
> if a checkpoint was committed and also change the
> state kept by the GenericWriteAheadSink to also
> include that subtask index of the subtask that wrote
> the pending checkpoint.
> This change is also necessary for making the operator rescalable.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)