This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1e7d45d53b7ea7b9cfadf2e293ba790f3a9e90c3 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed Feb 23 16:41:20 2022 +0100 [FLINK-25958][refactor][runtime] Separated the logic of creating and reporting the statistic in order to use it in different place in the future --- .../runtime/checkpoint/CheckpointCoordinator.java | 35 ++++---- .../runtime/checkpoint/CheckpointStatsTracker.java | 43 +-------- .../flink/runtime/checkpoint/Checkpoints.java | 3 +- .../runtime/checkpoint/CompletedCheckpoint.java | 31 ++++--- .../checkpoint/CompletedCheckpointStats.java | 24 +---- .../runtime/checkpoint/FailedCheckpointStats.java | 17 ---- .../runtime/checkpoint/PendingCheckpoint.java | 65 ++++++++------ .../runtime/checkpoint/PendingCheckpointStats.java | 90 +++++++------------ .../CheckpointCoordinatorMasterHooksTest.java | 6 +- .../CheckpointCoordinatorRestoringTest.java | 9 +- .../checkpoint/CheckpointCoordinatorTest.java | 3 +- .../checkpoint/CheckpointStateRestoreTest.java | 6 +- .../checkpoint/CheckpointStatsTrackerTest.java | 16 ++-- .../checkpoint/CompletedCheckpointStoreTest.java | 3 +- .../checkpoint/CompletedCheckpointTest.java | 62 +++++++++---- .../DefaultCompletedCheckpointStoreTest.java | 3 +- .../DefaultCompletedCheckpointStoreUtilsTest.java | 3 +- .../checkpoint/PendingCheckpointStatsTest.java | 32 ++----- .../runtime/checkpoint/PendingCheckpointTest.java | 100 +++------------------ .../StandaloneCompletedCheckpointStoreTest.java | 3 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 3 +- .../flink/runtime/jobmaster/JobMasterTest.java | 3 +- .../runtime/scheduler/SchedulerUtilsTest.java | 3 +- ...topWithSavepointTerminationHandlerImplTest.java | 3 +- 24 files changed, 218 insertions(+), 348 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 335ea9a..5248538 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -779,6 +779,9 @@ public class CheckpointCoordinator { } } + PendingCheckpointStats pendingCheckpointStats = + trackPendingCheckpointStats(checkpointID, checkpointPlan, props, timestamp); + final PendingCheckpoint checkpoint = new PendingCheckpoint( job, @@ -789,9 +792,8 @@ public class CheckpointCoordinator { masterHooks.keySet(), props, checkpointStorageLocation, - onCompletionPromise); - - trackPendingCheckpointStats(checkpoint); + onCompletionPromise, + pendingCheckpointStats); synchronized (lock) { pendingCheckpoints.put(checkpointID, checkpoint); @@ -1122,8 +1124,7 @@ public class CheckpointCoordinator { switch (checkpoint.acknowledgeTask( message.getTaskExecutionId(), message.getSubtaskState(), - message.getCheckpointMetrics(), - getStatsCallback(checkpoint))) { + message.getCheckpointMetrics())) { case SUCCESS: LOG.debug( "Received acknowledge message for checkpoint {} from task {} of job {} at {}.", @@ -1313,7 +1314,7 @@ public class CheckpointCoordinator { checkpointsCleaner, this::scheduleTriggerRequest, executor, - getStatsCallback(pendingCheckpoint)); + statsTracker); failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID()); return completedCheckpoint; @@ -2041,7 +2042,7 @@ public class CheckpointCoordinator { checkpointsCleaner, this::scheduleTriggerRequest, executor, - getStatsCallback(pendingCheckpoint)); + statsTracker); failureManager.handleCheckpointException( pendingCheckpoint, @@ -2180,11 +2181,15 @@ public class CheckpointCoordinator { SKIP; } - private void trackPendingCheckpointStats(PendingCheckpoint checkpoint) { + private PendingCheckpointStats trackPendingCheckpointStats( + long checkpointId, + CheckpointPlan checkpointPlan, + CheckpointProperties props, + long checkpointTimestamp) { Map<JobVertexID, Integer> vertices = Stream.concat( - checkpoint.getCheckpointPlan().getTasksToWaitFor().stream(), - checkpoint.getCheckpointPlan().getFinishedTasks().stream()) + checkpointPlan.getTasksToWaitFor().stream(), + checkpointPlan.getFinishedTasks().stream()) .map(Execution::getVertex) .map(ExecutionVertex::getJobVertex) .distinct() @@ -2195,13 +2200,11 @@ public class CheckpointCoordinator { PendingCheckpointStats pendingCheckpointStats = statsTracker.reportPendingCheckpoint( - checkpoint.getCheckpointID(), - checkpoint.getCheckpointTimestamp(), - checkpoint.getProps(), - vertices); + checkpointId, checkpointTimestamp, props, vertices); + + reportFinishedTasks(pendingCheckpointStats, checkpointPlan.getFinishedTasks()); - reportFinishedTasks( - pendingCheckpointStats, checkpoint.getCheckpointPlan().getFinishedTasks()); + return pendingCheckpointStats; } private void reportFinishedTasks( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java index c691614..f10a668 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java @@ -160,12 +160,7 @@ public class CheckpointStatsTracker { Map<JobVertexID, Integer> vertexToDop) { PendingCheckpointStats pending = - new PendingCheckpointStats( - checkpointId, - triggerTimestamp, - props, - vertexToDop, - PendingCheckpointStatsCallback.proxyFor(this)); + new PendingCheckpointStats(checkpointId, triggerTimestamp, props, vertexToDop); statsReadWriteLock.lock(); try { @@ -204,7 +199,7 @@ public class CheckpointStatsTracker { * * @param completed The completed checkpoint stats. */ - private void reportCompletedCheckpoint(CompletedCheckpointStats completed) { + void reportCompletedCheckpoint(CompletedCheckpointStats completed) { statsReadWriteLock.lock(); try { latestCompletedCheckpoint = completed; @@ -225,7 +220,7 @@ public class CheckpointStatsTracker { * * @param failed The failed checkpoint stats. */ - private void reportFailedCheckpoint(FailedCheckpointStats failed) { + void reportFailedCheckpoint(FailedCheckpointStats failed) { statsReadWriteLock.lock(); try { counts.incrementFailedCheckpoints(); @@ -276,38 +271,6 @@ public class CheckpointStatsTracker { } } - /** Callback for finalization of a pending checkpoint. */ - interface PendingCheckpointStatsCallback { - /** - * Report a completed checkpoint. - * - * @param completed The completed checkpoint. - */ - void reportCompletedCheckpoint(CompletedCheckpointStats completed); - - /** - * Report a failed checkpoint. - * - * @param failed The failed checkpoint. - */ - void reportFailedCheckpoint(FailedCheckpointStats failed); - - static PendingCheckpointStatsCallback proxyFor( - CheckpointStatsTracker checkpointStatsTracker) { - return new PendingCheckpointStatsCallback() { - @Override - public void reportCompletedCheckpoint(CompletedCheckpointStats completed) { - checkpointStatsTracker.reportCompletedCheckpoint(completed); - } - - @Override - public void reportFailedCheckpoint(FailedCheckpointStats failed) { - checkpointStatsTracker.reportFailedCheckpoint(failed); - } - }; - } - } - // ------------------------------------------------------------------------ // Metrics // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java index 7d27d50..cc97510 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java @@ -215,7 +215,8 @@ public class Checkpoints { checkpointProperties, restoreMode == RestoreMode.CLAIM ? new ClaimModeCompletedStorageLocation(location) - : location); + : location, + null); } private static void throwNonRestoredStateException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 1972427..841e543 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -104,8 +104,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { /** External pointer to the completed checkpoint (for example file path). */ private final String externalPointer; - /** Optional stats tracker callback for discard. */ - @Nullable private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback; + /** Completed statistic for managing discard marker. */ + @Nullable private final transient CompletedCheckpointStats completedCheckpointStats; // ------------------------------------------------------------------------ @@ -117,7 +117,8 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { Map<OperatorID, OperatorState> operatorStates, @Nullable Collection<MasterState> masterHookStates, CheckpointProperties props, - CompletedCheckpointStorageLocation storageLocation) { + CompletedCheckpointStorageLocation storageLocation, + @Nullable CompletedCheckpointStats completedCheckpointStats) { checkArgument(checkpointID >= 0); checkArgument(timestamp >= 0); @@ -140,6 +141,7 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { this.storageLocation = checkNotNull(storageLocation); this.metadataHandle = storageLocation.getMetadataHandle(); this.externalPointer = storageLocation.getExternalPointer(); + this.completedCheckpointStats = completedCheckpointStats; } // ------------------------------------------------------------------------ @@ -274,14 +276,19 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { } finally { operatorStates.clear(); - // to be null-pointer safe, copy reference to stack - CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback; - if (discardCallback != null) { - discardCallback.notifyDiscardedCheckpoint(); + if (completedCheckpointStats != null) { + completedCheckpointStats.discard(); } } } + /** NOT Thread safe. This method can be called only from CheckpointCoordinator thread. */ + public void markAsDiscarded() { + if (completedCheckpointStats != null) { + completedCheckpointStats.discard(); + } + } + public boolean shouldBeDiscardedOnSubsume() { return props.discardOnSubsumed(); } @@ -320,13 +327,9 @@ public class CompletedCheckpoint implements Serializable, Checkpoint { return firstInterestingFields.equals(secondInterestingFields); } - /** - * Sets the callback for tracking when this checkpoint is discarded. - * - * @param discardCallback Callback to call when the checkpoint is discarded. - */ - void setDiscardCallback(@Nullable CompletedCheckpointStats.DiscardCallback discardCallback) { - this.discardCallback = discardCallback; + @Nullable + public CompletedCheckpointStats getStatistic() { + return completedCheckpointStats; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java index 38a5c9f..88339ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java @@ -183,28 +183,10 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats { } /** - * Returns the callback for the {@link CompletedCheckpoint}. - * - * @return Callback for the {@link CompletedCheckpoint}. - */ - DiscardCallback getDiscardCallback() { - return new DiscardCallback(); - } - - /** - * Callback for the {@link CompletedCheckpoint} instance to notify about disposal of the - * checkpoint (most commonly when the checkpoint has been subsumed by a newer one). + * Mark the checkpoint has been discarded. */ - class DiscardCallback { - - /** - * Updates the discarded flag of the checkpoint stats. - * - * <p>After this notification, {@link #isDiscarded()} will return <code>true</code>. - */ - void notifyDiscardedCheckpoint() { - discarded = true; - } + void discard() { + discarded = true; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java index f9d3a0d..43a2a1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.PendingCheckpointStatsCallback; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; @@ -82,7 +81,6 @@ public class FailedCheckpointStats extends PendingCheckpointStats { totalSubtaskCount, numAcknowledgedSubtasks, taskStats, - FAILING_REPORT_CALLBACK, checkpointedSize, stateSize, processedData, @@ -122,19 +120,4 @@ public class FailedCheckpointStats extends PendingCheckpointStats { public String getFailureMessage() { return failureMsg; } - - private static final PendingCheckpointStatsCallback FAILING_REPORT_CALLBACK = - new PendingCheckpointStatsCallback() { - @Override - public void reportCompletedCheckpoint(CompletedCheckpointStats completed) { - throw new UnsupportedOperationException( - "Failed checkpoint stats can't be completed"); - } - - @Override - public void reportFailedCheckpoint(FailedCheckpointStats failed) { - throw new UnsupportedOperationException( - "Failed checkpoint stats can't be failed"); - } - }; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 8521722..aee95dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -111,6 +111,8 @@ public class PendingCheckpoint implements Checkpoint { /** The promise to fulfill once the checkpoint has been completed. */ private final CompletableFuture<CompletedCheckpoint> onCompletionPromise; + @Nullable private final PendingCheckpointStats pendingCheckpointStats; + private int numAcknowledgedTasks; private boolean disposed; @@ -132,8 +134,8 @@ public class PendingCheckpoint implements Checkpoint { Collection<String> masterStateIdentifiers, CheckpointProperties props, CheckpointStorageLocation targetLocation, - CompletableFuture<CompletedCheckpoint> onCompletionPromise) { - + CompletableFuture<CompletedCheckpoint> onCompletionPromise, + @Nullable PendingCheckpointStats pendingCheckpointStats) { checkArgument( checkpointPlan.getTasksToWaitFor().size() > 0, "Checkpoint needs at least one vertex that commits the checkpoint"); @@ -163,6 +165,7 @@ public class PendingCheckpoint implements Checkpoint { : new HashSet<>(operatorCoordinatorsToConfirm); this.acknowledgedTasks = new HashSet<>(checkpointPlan.getTasksToWaitFor().size()); this.onCompletionPromise = checkNotNull(onCompletionPromise); + this.pendingCheckpointStats = pendingCheckpointStats; } // -------------------------------------------------------------------------------------------- @@ -301,7 +304,7 @@ public class PendingCheckpoint implements Checkpoint { CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor, - @Nullable PendingCheckpointStats statsCallback) + CheckpointStatsTracker statsTracker) throws IOException { synchronized (lock) { @@ -334,26 +337,24 @@ public class PendingCheckpoint implements Checkpoint { operatorStates, masterStates, props, - finalizedLocation); + finalizedLocation, + toCompletedCheckpointStats(finalizedLocation)); - onCompletionPromise.complete(completed); - - if (statsCallback != null) { + CompletedCheckpointStats completedCheckpointStats = completed.getStatistic(); + if (completedCheckpointStats != null) { LOG.trace( "Checkpoint {} size: {}Kb, duration: {}ms", checkpointId, - statsCallback.getStateSize() == 0 + completedCheckpointStats.getStateSize() == 0 ? 0 - : statsCallback.getStateSize() / 1024, - statsCallback.getEndToEndDuration()); - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = - statsCallback.reportCompletedCheckpoint( - finalizedLocation.getExternalPointer()); - completed.setDiscardCallback(discardCallback); + : completedCheckpointStats.getStateSize() / 1024, + completedCheckpointStats.getEndToEndDuration()); + + statsTracker.reportCompletedCheckpoint(completedCheckpointStats); } + onCompletionPromise.complete(completed); + // mark this pending checkpoint as disposed, but do NOT drop the state dispose(false, checkpointsCleaner, postCleanup, executor); @@ -366,6 +367,15 @@ public class PendingCheckpoint implements Checkpoint { } } + @Nullable + private CompletedCheckpointStats toCompletedCheckpointStats( + CompletedCheckpointStorageLocation finalizedLocation) { + return pendingCheckpointStats != null + ? pendingCheckpointStats.toCompletedCheckpointStats( + finalizedLocation.getExternalPointer()) + : null; + } + /** * Acknowledges the task with the given execution attempt id and the given subtask state. * @@ -377,8 +387,7 @@ public class PendingCheckpoint implements Checkpoint { public TaskAcknowledgeResult acknowledgeTask( ExecutionAttemptID executionAttemptId, TaskStateSnapshot operatorSubtaskStates, - CheckpointMetrics metrics, - @Nullable PendingCheckpointStats statsCallback) { + CheckpointMetrics metrics) { synchronized (lock) { if (disposed) { @@ -415,7 +424,7 @@ public class PendingCheckpoint implements Checkpoint { // publish the checkpoint statistics // to prevent null-pointers from concurrent modification, copy reference onto stack - if (statsCallback != null) { + if (pendingCheckpointStats != null) { // Do this in millis because the web frontend works with them long alignmentDurationMillis = metrics.getAlignmentDurationNanos() / 1_000_000; long checkpointStartDelayMillis = @@ -443,10 +452,12 @@ public class PendingCheckpoint implements Checkpoint { subtaskStateStats.getStateSize() == 0 ? 0 : subtaskStateStats.getStateSize() / 1024, - subtaskStateStats.getEndToEndDuration(statsCallback.getTriggerTimestamp()), + subtaskStateStats.getEndToEndDuration( + pendingCheckpointStats.getTriggerTimestamp()), subtaskStateStats.getSyncCheckpointDuration(), subtaskStateStats.getAsyncCheckpointDuration()); - statsCallback.reportSubtaskStats(vertex.getJobvertexId(), subtaskStateStats); + pendingCheckpointStats.reportSubtaskStats( + vertex.getJobvertexId(), subtaskStateStats); } return TaskAcknowledgeResult.SUCCESS; @@ -541,11 +552,11 @@ public class PendingCheckpoint implements Checkpoint { CheckpointsCleaner checkpointsCleaner, Runnable postCleanup, Executor executor, - PendingCheckpointStats statsCallback) { + CheckpointStatsTracker statsTracker) { try { failureCause = new CheckpointException(reason, cause); onCompletionPromise.completeExceptionally(failureCause); - reportFailedCheckpoint(failureCause, statsCallback); + reportFailedCheckpoint(statsTracker, failureCause); assertAbortSubsumedForced(reason); } finally { dispose(true, checkpointsCleaner, postCleanup, executor); @@ -628,11 +639,11 @@ public class PendingCheckpoint implements Checkpoint { * * @param cause The failure cause or <code>null</code>. */ - private void reportFailedCheckpoint(Exception cause, PendingCheckpointStats statsCallback) { + private void reportFailedCheckpoint(CheckpointStatsTracker statsTracker, Exception cause) { // to prevent null-pointers from concurrent modification, copy reference onto stack - if (statsCallback != null) { - long failureTimestamp = System.currentTimeMillis(); - statsCallback.reportFailedCheckpoint(failureTimestamp, cause); + if (pendingCheckpointStats != null) { + statsTracker.reportFailedCheckpoint( + pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), cause)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java index a575cf6..a8d74f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.Map; import static java.util.stream.Collectors.toConcurrentMap; -import static org.apache.flink.util.Preconditions.checkNotNull; /** * Statistics for a pending checkpoint that is still in progress. @@ -45,9 +44,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { private static final long serialVersionUID = -973959257699390327L; - /** Tracker callback when the pending checkpoint is finalized or aborted. */ - private final transient CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback; - /** The current number of acknowledged subtasks. */ private volatile int currentNumAcknowledgedSubtasks; @@ -70,14 +66,12 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { * @param triggerTimestamp Timestamp when the checkpoint was triggered. * @param props Checkpoint properties of the checkpoint. * @param taskStats Task stats for each involved operator. - * @param trackerCallback Callback for the {@link CheckpointStatsTracker}. */ PendingCheckpointStats( long checkpointId, long triggerTimestamp, CheckpointProperties props, - Map<JobVertexID, Integer> taskStats, - CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) { + Map<JobVertexID, Integer> taskStats) { this( checkpointId, triggerTimestamp, @@ -87,8 +81,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { .collect( toConcurrentMap( Map.Entry::getKey, - e -> new TaskStateStats(e.getKey(), e.getValue()))), - trackerCallback); + e -> new TaskStateStats(e.getKey(), e.getValue())))); } /** @@ -99,15 +92,13 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { * @param props Checkpoint properties of the checkpoint. * @param totalSubtaskCount Total number of subtasks for the checkpoint. * @param taskStats Task stats for each involved operator. - * @param trackerCallback Callback for the {@link CheckpointStatsTracker}. */ PendingCheckpointStats( long checkpointId, long triggerTimestamp, CheckpointProperties props, int totalSubtaskCount, - Map<JobVertexID, TaskStateStats> taskStats, - CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback) { + Map<JobVertexID, TaskStateStats> taskStats) { this( checkpointId, triggerTimestamp, @@ -115,7 +106,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { totalSubtaskCount, 0, taskStats, - trackerCallback, 0, 0, 0, @@ -130,7 +120,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { int totalSubtaskCount, int acknowledgedSubtaskCount, Map<JobVertexID, TaskStateStats> taskStats, - CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback, long currentCheckpointedSize, long currentStateSize, long processedData, @@ -138,7 +127,6 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { @Nullable SubtaskStateStats latestAcknowledgedSubtask) { super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats); - this.trackerCallback = checkNotNull(trackerCallback); this.currentCheckpointedSize = currentCheckpointedSize; this.currentStateSize = currentStateSize; this.currentProcessedData = processedData; @@ -220,31 +208,20 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { } } - /** - * Reports a successfully completed pending checkpoint. - * - * @param externalPointer Optional external storage path if checkpoint was externalized. - * @return Callback for the {@link CompletedCheckpoint} instance to notify about disposal. - */ - CompletedCheckpointStats.DiscardCallback reportCompletedCheckpoint(String externalPointer) { - CompletedCheckpointStats completed = - new CompletedCheckpointStats( - checkpointId, - triggerTimestamp, - props, - numberOfSubtasks, - new HashMap<>(taskStats), - currentNumAcknowledgedSubtasks, - currentCheckpointedSize, - currentStateSize, - currentProcessedData, - currentPersistedData, - latestAcknowledgedSubtask, - externalPointer); - - trackerCallback.reportCompletedCheckpoint(completed); - - return completed.getDiscardCallback(); + CompletedCheckpointStats toCompletedCheckpointStats(String externalPointer) { + return new CompletedCheckpointStats( + checkpointId, + triggerTimestamp, + props, + numberOfSubtasks, + new HashMap<>(taskStats), + currentNumAcknowledgedSubtasks, + currentCheckpointedSize, + currentStateSize, + currentProcessedData, + currentPersistedData, + latestAcknowledgedSubtask, + externalPointer); } /** @@ -253,24 +230,21 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { * @param failureTimestamp Timestamp of the failure. * @param cause Optional cause of the failure. */ - void reportFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) { - FailedCheckpointStats failed = - new FailedCheckpointStats( - checkpointId, - triggerTimestamp, - props, - numberOfSubtasks, - new HashMap<>(taskStats), - currentNumAcknowledgedSubtasks, - currentCheckpointedSize, - currentStateSize, - currentProcessedData, - currentPersistedData, - failureTimestamp, - latestAcknowledgedSubtask, - cause); - - trackerCallback.reportFailedCheckpoint(failed); + FailedCheckpointStats toFailedCheckpoint(long failureTimestamp, @Nullable Throwable cause) { + return new FailedCheckpointStats( + checkpointId, + triggerTimestamp, + props, + numberOfSubtasks, + new HashMap<>(taskStats), + currentNumAcknowledgedSubtasks, + currentCheckpointedSize, + currentStateSize, + currentProcessedData, + currentPersistedData, + failureTimestamp, + latestAcknowledgedSubtask, + cause); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 3dfebab..ce90895 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -296,7 +296,8 @@ public class CheckpointCoordinatorMasterHooksTest { masterHookStates, CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() .addJobVertex(new JobVertexID()) @@ -356,7 +357,8 @@ public class CheckpointCoordinatorMasterHooksTest { masterHookStates, CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index 417473e..3b83973 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -773,7 +773,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { Collections.<MasterState>emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); // set up the coordinator and validate the initial state SharedStateRegistry sharedStateRegistry = @@ -1089,7 +1090,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); completedCheckpointStore.addCheckpointAndSubsumeOldestOne( completedCheckpoint, new CheckpointsCleaner(), () -> {}); @@ -1131,7 +1133,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); completedCheckpointStore.addCheckpointAndSubsumeOldestOne( completedCheckpoint, new CheckpointsCleaner(), () -> {}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 9297c54..2cfa183 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2821,7 +2821,8 @@ public class CheckpointCoordinatorTest extends TestLogger { Collections.<MasterState>emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()), + new TestCompletedCheckpointStorageLocation(), + null), new CheckpointsCleaner(), () -> {}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 44696cb..6239892 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -239,7 +239,8 @@ public class CheckpointStateRestoreTest { Collections.<MasterState>emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); coord.getCheckpointStore() .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {}); @@ -269,7 +270,8 @@ public class CheckpointStateRestoreTest { Collections.<MasterState>emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); coord.getCheckpointStore() .addCheckpointAndSubsumeOldestOne(checkpoint, new CheckpointsCleaner(), () -> {}); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index c11e572..8e7b0bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -69,7 +69,7 @@ public class CheckpointStatsTrackerTest { pending.reportSubtaskStats(jobVertexID, createSubtaskStats(1)); pending.reportSubtaskStats(jobVertexID, createSubtaskStats(2)); - pending.reportCompletedCheckpoint(null); + tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null)); CheckpointStatsSnapshot snapshot = tracker.createSnapshot(); // History should be empty @@ -118,7 +118,7 @@ public class CheckpointStatsTrackerTest { completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(1)); completed1.reportSubtaskStats(jobVertexID, createSubtaskStats(2)); - completed1.reportCompletedCheckpoint(null); + tracker.reportCompletedCheckpoint(completed1.toCompletedCheckpointStats(null)); // Failed checkpoint PendingCheckpointStats failed = @@ -129,7 +129,7 @@ public class CheckpointStatsTrackerTest { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), vertexToDop); - failed.reportFailedCheckpoint(12, null); + tracker.reportFailedCheckpoint(failed.toFailedCheckpoint(12, null)); // Completed savepoint PendingCheckpointStats savepoint = @@ -143,7 +143,7 @@ public class CheckpointStatsTrackerTest { savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(1)); savepoint.reportSubtaskStats(jobVertexID, createSubtaskStats(2)); - savepoint.reportCompletedCheckpoint(null); + tracker.reportCompletedCheckpoint(savepoint.toCompletedCheckpointStats(null)); // In Progress PendingCheckpointStats inProgress = @@ -242,7 +242,7 @@ public class CheckpointStatsTrackerTest { assertEquals(snapshot2, tracker.createSnapshot()); // Complete checkpoint => new snapshot - pending.reportCompletedCheckpoint(null); + tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null)); CheckpointStatsSnapshot snapshot3 = tracker.createSnapshot(); assertNotEquals(snapshot2, snapshot3); @@ -422,7 +422,7 @@ public class CheckpointStatsTrackerTest { assertTrue(pending.reportSubtaskStats(jobVertexID, subtaskStats)); - pending.reportCompletedCheckpoint(externalPath); + stats.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath)); // Verify completed checkpoint updated assertEquals(Long.valueOf(1), numCheckpoints.getValue()); @@ -446,7 +446,7 @@ public class CheckpointStatsTrackerTest { singletonMap(jobVertexID, 1)); long failureTimestamp = 1230123L; - nextPending.reportFailedCheckpoint(failureTimestamp, null); + stats.reportFailedCheckpoint(nextPending.toFailedCheckpoint(failureTimestamp, null)); // Verify updated assertEquals(Long.valueOf(2), numCheckpoints.getValue()); @@ -482,7 +482,7 @@ public class CheckpointStatsTrackerTest { singletonMap(jobVertexID, 1)); thirdPending.reportSubtaskStats(jobVertexID, subtaskStats); - thirdPending.reportCompletedCheckpoint(null); + stats.reportCompletedCheckpoint(thirdPending.toCompletedCheckpointStats(null)); // Verify external path is "n/a", because internal checkpoint won't generate external path. assertEquals("n/a", latestCompletedExternalPath.getValue()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 122773a..62e5479 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -307,7 +307,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { operatorGroupState, null, props, - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java index c7cad82..3a29452 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java @@ -63,7 +63,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( @@ -75,7 +76,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); List<CompletedCheckpoint> checkpoints1 = new ArrayList<>(); checkpoints1.add(checkpoint1); @@ -103,7 +105,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( @@ -115,7 +118,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); List<CompletedCheckpoint> checkpoints1 = new ArrayList<>(); checkpoints1.add(checkpoint1); @@ -145,7 +149,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( @@ -157,7 +162,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); List<CompletedCheckpoint> checkpoints1 = new ArrayList<>(); checkpoints1.add(checkpoint1); @@ -184,7 +190,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); CompletedCheckpoint checkpoint2 = new CompletedCheckpoint( @@ -196,7 +203,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); List<CompletedCheckpoint> checkpoints1 = new ArrayList<>(); checkpoints1.add(checkpoint1); @@ -223,7 +231,8 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); @@ -254,7 +263,8 @@ public class CompletedCheckpointTest { operatorStates, Collections.emptyList(), props, - location); + location, + null); SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); @@ -306,7 +316,8 @@ public class CompletedCheckpointTest { new HashMap<>(operatorStates), Collections.emptyList(), retainProps, - retainedLocation); + retainedLocation, + null); checkpoint.discardOnShutdown(status); @@ -333,7 +344,8 @@ public class CompletedCheckpointTest { new HashMap<>(operatorStates), Collections.emptyList(), discardProps, - discardLocation); + discardLocation, + null); checkpoint.discardOnShutdown(status); @@ -346,6 +358,23 @@ public class CompletedCheckpointTest { /** Tests that the stats callbacks happen if the callback is registered. */ @Test public void testCompletedCheckpointStatsCallbacks() throws Exception { + Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>(); + JobVertexID jobVertexId = new JobVertexID(); + taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1)); + CompletedCheckpointStats checkpointStats = + new CompletedCheckpointStats( + 1, + 0, + CheckpointProperties.forCheckpoint( + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + 1, + taskStats, + 1, + 1, + 1, + 1, + mock(SubtaskStateStats.class), + null); CompletedCheckpoint completed = new CompletedCheckpoint( new JobID(), @@ -356,14 +385,11 @@ public class CompletedCheckpointTest { Collections.emptyList(), CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); - - CompletedCheckpointStats.DiscardCallback callback = - mock(CompletedCheckpointStats.DiscardCallback.class); - completed.setDiscardCallback(callback); + new TestCompletedCheckpointStorageLocation(), + checkpointStats); completed.discardOnShutdown(JobStatus.FINISHED); - verify(callback, times(1)).notifyDiscardedCheckpoint(); + assertTrue(checkpointStats.isDiscarded()); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java index ed36db9..064e26a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java @@ -418,7 +418,8 @@ public class DefaultCompletedCheckpointStoreTest extends TestLogger { Collections.emptyMap(), Collections.emptyList(), props, - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); } private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> createStateHandles( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java index d7abd6e..82db6ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java @@ -58,7 +58,8 @@ public class DefaultCompletedCheckpointStoreUtilsTest extends TestLogger { new HashMap<>(), Collections.emptyList(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); } private static class FailingRetrievableStateHandle<T extends Serializable> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java index 37680ba..e46f608 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java @@ -52,17 +52,9 @@ public class PendingCheckpointStatsTest { taskStats.put(task1.getJobVertexId(), task1); taskStats.put(task2.getJobVertexId(), task2); - CheckpointStatsTracker.PendingCheckpointStatsCallback callback = - mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class); - PendingCheckpointStats pending = new PendingCheckpointStats( - checkpointId, - triggerTimestamp, - props, - totalSubtaskCount, - taskStats, - callback); + checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats); // Check initial state assertEquals(checkpointId, pending.getCheckpointId()); @@ -129,8 +121,7 @@ public class PendingCheckpointStatsTest { taskStats.put(task1.getJobVertexId(), task1); taskStats.put(task2.getJobVertexId(), task2); - CheckpointStatsTracker.PendingCheckpointStatsCallback callback = - mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class); + CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class); PendingCheckpointStats pending = new PendingCheckpointStats( @@ -139,8 +130,7 @@ public class PendingCheckpointStatsTest { CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(), - taskStats, - callback); + taskStats); // Report subtasks for (int i = 0; i < task1.getNumberOfSubtasks(); i++) { @@ -154,8 +144,7 @@ public class PendingCheckpointStatsTest { // Report completed String externalPath = "asdjkasdjkasd"; - CompletedCheckpointStats.DiscardCallback discardCallback = - pending.reportCompletedCheckpoint(externalPath); + callback.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(externalPath)); ArgumentCaptor<CompletedCheckpointStats> args = ArgumentCaptor.forClass(CompletedCheckpointStats.class); @@ -166,7 +155,7 @@ public class PendingCheckpointStatsTest { assertNotNull(completed); assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus()); assertFalse(completed.isDiscarded()); - discardCallback.notifyDiscardedCheckpoint(); + completed.discard(); assertTrue(completed.isDiscarded()); assertEquals(externalPath, completed.getExternalPath()); @@ -194,8 +183,7 @@ public class PendingCheckpointStatsTest { taskStats.put(task1.getJobVertexId(), task1); taskStats.put(task2.getJobVertexId(), task2); - CheckpointStatsTracker.PendingCheckpointStatsCallback callback = - mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class); + CheckpointStatsTracker callback = mock(CheckpointStatsTracker.class); long triggerTimestamp = 123123; PendingCheckpointStats pending = @@ -205,8 +193,7 @@ public class PendingCheckpointStatsTest { CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks(), - taskStats, - callback); + taskStats); // Report subtasks for (int i = 0; i < task1.getNumberOfSubtasks(); i++) { @@ -220,7 +207,7 @@ public class PendingCheckpointStatsTest { // Report failed Exception cause = new Exception("test exception"); long failureTimestamp = 112211137; - pending.reportFailedCheckpoint(failureTimestamp, cause); + callback.reportFailedCheckpoint(pending.toFailedCheckpoint(failureTimestamp, cause)); ArgumentCaptor<FailedCheckpointStats> args = ArgumentCaptor.forClass(FailedCheckpointStats.class); @@ -263,8 +250,7 @@ public class PendingCheckpointStatsTest { CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), 1337, - taskStats, - mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class)); + taskStats); PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 05111d6..294c6fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.OperatorInfo; import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo; @@ -74,9 +73,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -218,7 +215,7 @@ public class PendingCheckpointTest { future = pending.getCompletionFuture(); assertFalse(future.isDone()); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertTrue(pending.areTasksFullyAcknowledged()); pending.finalizeCheckpoint( new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null); @@ -292,75 +289,6 @@ public class PendingCheckpointTest { verify(state, times(1)).discardState(); } - /** Tests that the stats callbacks happen if the callback is registered. */ - @Test - public void testPendingCheckpointStatsCallbacks() throws Exception { - { - // Complete successfully - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), callback); - verify(callback, times(1)) - .reportSubtaskStats(nullable(JobVertexID.class), any(SubtaskStateStats.class)); - - pending.finalizeCheckpoint( - new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), callback); - verify(callback, times(1)).reportCompletedCheckpoint(any(String.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_SUBSUMED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } - - { - // Fail subsumed - PendingCheckpointStats callback = mock(PendingCheckpointStats.class); - PendingCheckpoint pending = - createPendingCheckpoint( - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - - abort(pending, CheckpointFailureReason.CHECKPOINT_EXPIRED, callback); - verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); - } - } - /** * FLINK-5985. * @@ -373,7 +301,7 @@ public class PendingCheckpointTest { createPendingCheckpoint( CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); - pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class), null); + pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class)); final OperatorState expectedState = new OperatorState(OPERATOR_ID, PARALLELISM, MAX_PARALLELISM); Assert.assertEquals( @@ -394,7 +322,7 @@ public class PendingCheckpointTest { CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); pending.acknowledgeTask( - ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class), null); + ATTEMPT_ID, mock(TaskStateSnapshot.class), mock(CheckpointMetrics.class)); Assert.assertFalse(pending.getOperatorStates().isEmpty()); } @@ -441,7 +369,7 @@ public class PendingCheckpointTest { assertTrue(pending.areMasterStatesFullyAcknowledged()); assertFalse(pending.areTasksFullyAcknowledged()); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertTrue(pending.areTasksFullyAcknowledged()); final List<MasterState> resultMasterStates = pending.getMasterStates(); @@ -494,7 +422,7 @@ public class PendingCheckpointTest { assertTrue(pending.areMasterStatesFullyAcknowledged()); assertFalse(pending.areTasksFullyAcknowledged()); - pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null); + pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); assertTrue(pending.areTasksFullyAcknowledged()); final List<MasterState> resultMasterStates = pending.getMasterStates(); @@ -580,8 +508,7 @@ public class PendingCheckpointTest { checkpoint.acknowledgeTask( ACK_TASKS.get(0).getAttemptId(), TaskStateSnapshot.FINISHED_ON_RESTORE, - new CheckpointMetrics(), - null); + new CheckpointMetrics()); assertThat( recordCheckpointPlan.getReportedFinishedOnRestoreTasks(), contains(ACK_TASKS.get(0).getVertex())); @@ -595,8 +522,7 @@ public class PendingCheckpointTest { checkpoint.acknowledgeTask( ACK_TASKS.get(0).getAttemptId(), new TaskStateSnapshot(10, true), - new CheckpointMetrics(), - null); + new CheckpointMetrics()); assertThat( recordCheckpointPlan.getReportedOperatorsFinishedTasks(), contains(ACK_TASKS.get(0).getVertex())); @@ -648,7 +574,7 @@ public class PendingCheckpointTest { Collections.emptyList(), Executors.directExecutor()); - checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(), null); + checkpoint.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics()); return checkpoint; } @@ -718,7 +644,8 @@ public class PendingCheckpointTest { masterStateIdentifiers, props, location, - new CompletableFuture<>()); + new CompletableFuture<>(), + null); } @SuppressWarnings("unchecked") @@ -741,12 +668,7 @@ public class PendingCheckpointTest { CheckpointFailureReason reason, PendingCheckpointStats statsCallback) { checkpoint.abort( - reason, - null, - new CheckpointsCleaner(), - () -> {}, - Executors.directExecutor(), - statsCallback); + reason, null, new CheckpointsCleaner(), () -> {}, Executors.directExecutor(), null); } private static final class QueueExecutor implements Executor { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index d36b321..9a584c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -107,7 +107,8 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()) { + new TestCompletedCheckpointStorageLocation(), + null) { @Override public boolean discardOnSubsume() { discardAttempted.countDown(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index ee4f46c..804d285 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -254,7 +254,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); // shouldn't fail despite the exception store.addCheckpointAndSubsumeOldestOne( checkpointToAdd, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 50bac5c..50a9700 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -880,7 +880,8 @@ public class JobMasterTest extends TestLogger { null, CheckpointProperties.forCheckpoint( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - new DummyCheckpointStorageLocation()); + new DummyCheckpointStorageLocation(), + null); final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java index 641a918..dfc3b40 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java @@ -150,7 +150,8 @@ public class SchedulerUtilsTest extends TestLogger { singletonMap(operatorID, operatorState), emptyList(), CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation()); + new TestCompletedCheckpointStorageLocation(), + null); } private IncrementalRemoteKeyedStateHandle buildIncrementalHandle( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java index f6db13c..cab4abe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java @@ -223,6 +223,7 @@ public class StopWithSavepointTerminationHandlerImplTest extends TestLogger { new HashMap<>(), null, CheckpointProperties.forSavepoint(true, SavepointFormatType.CANONICAL), - new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path")); + new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"), + null); } }
