[
https://issues.apache.org/jira/browse/FLINK-2624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14960812#comment-14960812
]
ASF GitHub Bot commented on FLINK-2624:
---------------------------------------
Github user StephanEwen commented on a diff in the pull request:
https://github.com/apache/flink/pull/1243#discussion_r42250017
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
---
@@ -157,17 +179,35 @@ public void restoreState(SerializedCheckpointData[]
state) throws Exception {
@Override
public void notifyCheckpointComplete(long checkpointId) throws
Exception {
- for (Iterator<Tuple2<Long, List<Id>>> iter =
pendingCheckpoints.iterator(); iter.hasNext();) {
- Tuple2<Long, List<Id>> checkpoint = iter.next();
- long id = checkpoint.f0;
-
- if (id <= checkpointId) {
- acknowledgeIDs(checkpoint.f1);
- iter.remove();
- }
- else {
- break;
+ if (!running) {
+ LOG.debug("notifyCheckpointComplete() called on closed
source");
+ return;
+ }
+
+ // only one commit operation must be in progress
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing Messages externally for
checkpoint {}", checkpointId);
+ }
+
+ List<Id> idsOfCheckpointedMessages = new LinkedList<Id>();
+ synchronized (pendingCheckpoints) {
+ for (Iterator<Tuple2<Long, List<Id>>> iter =
pendingCheckpoints.iterator(); iter.hasNext();) {
+ Tuple2<Long, List<Id>> checkpoint = iter.next();
+ long id = checkpoint.f0;
+
+ if (id <= checkpointId) {
+
idsForCurrentCheckpoint.addAll(checkpoint.f1);
+ acknowledgeIDs(checkpoint.f1);
+ iter.remove();
+ }
+ else {
+ break;
+ }
}
}
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Committing Messages with following IDs {}",
idsOfCheckpointedMessages);
--- End diff --
This will in most cases generate a gigantic log line that may bring the
whole system to a stall.
> RabbitMQ source / sink should participate in checkpointing
> ----------------------------------------------------------
>
> Key: FLINK-2624
> URL: https://issues.apache.org/jira/browse/FLINK-2624
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 0.10
> Reporter: Stephan Ewen
> Assignee: Hilmi Yildirim
>
> The RabbitMQ connector does not offer any fault tolerance guarantees right
> now, because it does not participate in the checkpointing.
> We should integrate it in a similar was as the {{FlinkKafkaConsumer}} is
> integrated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)