[
https://issues.apache.org/jira/browse/FLINK-2624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14960814#comment-14960814
]
ASF GitHub Bot commented on FLINK-2624:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1243#discussion_r42250317
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
---
@@ -157,17 +179,35 @@ public void restoreState(SerializedCheckpointData[]
state) throws Exception {
@Override
public void notifyCheckpointComplete(long checkpointId) throws
Exception {
- for (Iterator<Tuple2<Long, List<Id>>> iter =
pendingCheckpoints.iterator(); iter.hasNext();) {
- Tuple2<Long, List<Id>> checkpoint = iter.next();
- long id = checkpoint.f0;
-
- if (id <= checkpointId) {
- acknowledgeIDs(checkpoint.f1);
- iter.remove();
- }
- else {
- break;
+ if (!running) {
+ LOG.debug("notifyCheckpointComplete() called on closed
source");
+ return;
+ }
+
+ // only one commit operation must be in progress
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing Messages externally for
checkpoint {}", checkpointId);
+ }
+
+ List<Id> idsOfCheckpointedMessages = new LinkedList<Id>();
--- End diff --
The extra copying into a list only to generate a log statement seems like
too much overhead.
Also, logging potentially millions of IDs the log (especially on INFO
level) is probably a bit overkill...
I would log the IDs on TRACE level only. Also, rather than copy the
individual lists into one lists, I would save the overhead and log one line per
list.
> RabbitMQ source / sink should participate in checkpointing
> ----------------------------------------------------------
>
> Key: FLINK-2624
> URL: https://issues.apache.org/jira/browse/FLINK-2624
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 0.10
> Reporter: Stephan Ewen
> Assignee: Hilmi Yildirim
>
> The RabbitMQ connector does not offer any fault tolerance guarantees right
> now, because it does not participate in the checkpointing.
> We should integrate it in a similar was as the {{FlinkKafkaConsumer}} is
> integrated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)