[
https://issues.apache.org/jira/browse/FLINK-3261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114940#comment-15114940
]
ASF GitHub Bot commented on FLINK-3261:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/1537#discussion_r50669903
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -505,6 +506,88 @@ else if (pendingCheckpoints.size() >=
maxConcurrentCheckpointAttempts) {
}
/**
+ * Receives a {@link DeclineCheckpoint} message and returns whether the
+ * message was associated with a pending checkpoint.
+ *
+ * @param message Checkpoint decline from the task manager
+ *
+ * @return Flag indicating whether the declined checkpoint was
associated
+ * with a pending checkpoint.
+ */
+
+ public boolean receiveDeclineMessage(DeclineCheckpoint message) throws
Exception {
+ if (shutdown || message == null) {
+ return false;
+ }
+ if (!job.equals(message.getJob())) {
+ LOG.error("Received DeclineCheckpoint message for wrong
job: {}", message);
+ return false;
+ }
+
+ final long checkpointId = message.getCheckpointId();
+
+ CompletedCheckpoint completed = null;
+ PendingCheckpoint checkpoint;
+
+ // Flag indicating whether the ack message was for a known
pending
+ // checkpoint.
+ boolean isPendingCheckpoint;
+
+ synchronized (lock) {
+ // we need to check inside the lock for being shutdown
as well, otherwise we
+ // get races and invalid error log messages
+ if (shutdown) {
+ return false;
+ }
+
+ checkpoint = pendingCheckpoints.get(checkpointId);
+
+ if (checkpoint != null && !checkpoint.isDiscarded()) {
+ isPendingCheckpoint = true;
+
+ LOG.info("Discarding checkpoint " + checkpointId
+ + " because of checkpoint decline from
task " + message.getTaskExecutionId());
+
+ pendingCheckpoints.remove(checkpointId);
+ checkpoint.discard(userClassLoader);
+ rememberRecentCheckpointId(checkpointId);
+
+ boolean haveMoreRecentPending = false;
+ Iterator<Map.Entry<Long, PendingCheckpoint>>
entries = pendingCheckpoints.entrySet().iterator();
+ while (entries.hasNext()) {
+ PendingCheckpoint p =
entries.next().getValue();
+ if (!p.isDiscarded() &&
p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+ haveMoreRecentPending = true;
+ break;
+ }
+ }
+ if (!haveMoreRecentPending &&
!triggerRequestQueued) {
+ LOG.info("Triggering new checkpoint
because of discarded checkpoint " + checkpointId);
+
triggerCheckpoint(System.currentTimeMillis());
+ } else if (!haveMoreRecentPending) {
+ LOG.info("Promoting queued checkpoint
request because of discarded checkpoint " + checkpointId);
+ triggerQueuedRequests();
+ }
+
--- End diff --
empty line
> Tasks should eagerly report back when they cannot start a checkpoint
> --------------------------------------------------------------------
>
> Key: FLINK-3261
> URL: https://issues.apache.org/jira/browse/FLINK-3261
> Project: Flink
> Issue Type: Bug
> Components: Distributed Runtime
> Affects Versions: 0.10.1
> Reporter: Stephan Ewen
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.0.0
>
>
> With very fast checkpoint intervals (few 100 msecs), it can happen that a
> Task is not ready to start a checkpoint by the time it gets the first
> checkpoint trigger message.
> If some other tasks are ready already and commence a checkpoint, the stream
> alignment will make the non-participating task wait until the checkpoint
> expires (default: 10 minutes).
> A simple way to fix this is that tasks report back when they could not start
> a checkpoint. The checkpoint coordinator can then abort that checkpoint and
> unblock the streams by starting new checkpoint (where all tasks will
> participate).
> An optimization would be to send a special "abort checkpoints barrier" that
> tells the barrier buffers for stream alignment to unblock a checkpoint.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)