[
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648108#comment-15648108
]
ASF GitHub Bot commented on FLINK-4975:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2754#discussion_r87034115
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -525,81 +527,63 @@ else if (!props.forceCheckpoint()) {
}
/**
- * Receives a {@link DeclineCheckpoint} message and returns whether the
- * message was associated with a pending checkpoint.
+ * Receives a {@link DeclineCheckpoint} message for a pending
checkpoint.
*
* @param message Checkpoint decline from the task manager
- *
- * @return Flag indicating whether the declined checkpoint was
associated
- * with a pending checkpoint.
*/
- public boolean receiveDeclineMessage(DeclineCheckpoint message) throws
Exception {
+ public void receiveDeclineMessage(DeclineCheckpoint message) throws
Exception {
if (shutdown || message == null) {
- return false;
+ return;
}
if (!job.equals(message.getJob())) {
- LOG.error("Received DeclineCheckpoint message for wrong
job: {}", message);
- return false;
+ throw new IllegalArgumentException("Received
DeclineCheckpoint message for job " +
+ message.getJob() + " while this coordinator
handles job " + job);
}
final long checkpointId = message.getCheckpointId();
+ final String reason = (message.getReason() != null ?
message.getReason().getMessage() : "");
PendingCheckpoint checkpoint;
- // Flag indicating whether the ack message was for a known
pending
- // checkpoint.
- boolean isPendingCheckpoint;
-
synchronized (lock) {
// we need to check inside the lock for being shutdown
as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
- return false;
+ return;
}
checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
- isPendingCheckpoint = true;
-
- LOG.info("Discarding checkpoint " + checkpointId
- + " because of checkpoint
decline from task " + message.getTaskExecutionId());
+ LOG.info("Discarding checkpoint " +
checkpointId + " because of checkpoint decline from task " +
--- End diff --
Parameterized logging statement using `{}` would be better.
> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
> Key: FLINK-4975
> URL: https://issues.apache.org/jira/browse/FLINK-4975
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.1.3
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After
> exceeding that limit, the checkpoint alignment should abort and the
> checkpoint be canceled.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)