This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90dd45bfb04324ce559e58d9362ea561698b5ff3 Author: Stephan Ewen <[email protected]> AuthorDate: Sun Dec 8 15:17:23 2019 +0100 [hotfix][runtime] Fix checkstyle in org.apache.flink.runtime.checkpoint (main scope) Test scope is not included in this fix. --- .../runtime/checkpoint/AbstractCheckpointStats.java | 1 + .../checkpoint/CheckpointCoordinatorGateway.java | 3 +++ .../runtime/checkpoint/CheckpointFailureManager.java | 2 +- .../flink/runtime/checkpoint/CheckpointMetaData.java | 4 ++-- .../flink/runtime/checkpoint/CheckpointMetrics.java | 10 +++++----- .../runtime/checkpoint/CheckpointRetentionPolicy.java | 2 +- .../runtime/checkpoint/CheckpointStatsHistory.java | 10 +++++----- .../runtime/checkpoint/CheckpointStatsSnapshot.java | 2 +- .../runtime/checkpoint/CompletedCheckpointStore.java | 2 +- .../DefaultLastStateConnectionStateListener.java | 3 +++ .../runtime/checkpoint/FailedCheckpointStats.java | 1 + .../apache/flink/runtime/checkpoint/OperatorState.java | 16 ++++++++-------- .../flink/runtime/checkpoint/OperatorSubtaskState.java | 2 +- .../runtime/checkpoint/PendingCheckpointStats.java | 3 ++- .../checkpoint/PrioritizedOperatorSubtaskState.java | 7 +++++-- .../runtime/checkpoint/StateAssignmentOperation.java | 8 ++++---- .../runtime/checkpoint/StateObjectCollection.java | 1 + .../apache/flink/runtime/checkpoint/SubtaskState.java | 4 ++-- .../org/apache/flink/runtime/checkpoint/TaskState.java | 11 +++++------ .../flink/runtime/checkpoint/TaskStateStats.java | 1 + .../checkpoint/ZooKeeperCheckpointRecoveryFactory.java | 3 ++- .../checkpoint/ZooKeeperCompletedCheckpointStore.java | 5 ++--- .../flink/runtime/checkpoint/hooks/MasterHooks.java | 13 +++---------- .../checkpoint/savepoint/SavepointSerializers.java | 6 +++--- .../runtime/checkpoint/savepoint/SavepointV1.java | 4 ++-- .../checkpoint/savepoint/SavepointV1Serializer.java | 7 ++++--- .../runtime/checkpoint/savepoint/SavepointV2.java | 18 +++++++++--------- .../checkpoint/savepoint/SavepointV2Serializer.java | 16 ++++++++-------- .../runtime/state/memory/ByteStreamStateHandle.java | 2 +- .../savepoint/SavepointV2SerializerTest.java | 10 +++++----- tools/maven/suppressions-runtime.xml | 6 +----- 31 files changed, 93 insertions(+), 90 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java index 5b3c7c7..f49790d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; + import java.io.Serializable; import java.util.Collection; import java.util.Map; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java index b8dc554..a199820 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java @@ -23,6 +23,9 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.rpc.RpcGateway; +/** + * RPC Gateway interface for messages to the CheckpointCoordinator. + */ public interface CheckpointCoordinatorGateway extends RpcGateway { void acknowledgeCheckpoint( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java index 3e73e7f..9cd8fba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -142,7 +142,7 @@ public class CheckpointFailureManager { * @param checkpointId the failed checkpoint id used to count the continuous failure number based on * checkpoint id sequence. */ - public void handleCheckpointSuccess(long checkpointId) { + public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) { clearCount(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java index 9960b44..aa31885 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetaData.java @@ -27,10 +27,10 @@ public class CheckpointMetaData implements Serializable { private static final long serialVersionUID = -2387652345781312442L; - /** The ID of the checkpoint */ + /** The ID of the checkpoint. */ private final long checkpointId; - /** The timestamp of the checkpoint */ + /** The timestamp of the checkpoint. */ private final long timestamp; public CheckpointMetaData(long checkpointId, long timestamp) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java index 4307a73..d18feb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointMetrics.java @@ -30,16 +30,16 @@ public class CheckpointMetrics implements Serializable { private static final long serialVersionUID = 1L; - /** The number of bytes that were buffered during the checkpoint alignment phase */ + /** The number of bytes that were buffered during the checkpoint alignment phase. */ private long bytesBufferedInAlignment; - /** The duration (in nanoseconds) that the stream alignment for the checkpoint took */ + /** The duration (in nanoseconds) that the stream alignment for the checkpoint took. */ private long alignmentDurationNanos; - /* The duration (in milliseconds) of the synchronous part of the operator checkpoint */ + /** The duration (in milliseconds) of the synchronous part of the operator checkpoint. */ private long syncDurationMillis; - /* The duration (in milliseconds) of the asynchronous part of the operator checkpoint */ + /** The duration (in milliseconds) of the asynchronous part of the operator checkpoint. */ private long asyncDurationMillis; private long checkpointStartDelayNanos; @@ -121,7 +121,7 @@ public class CheckpointMetrics implements Serializable { CheckpointMetrics that = (CheckpointMetrics) o; - return bytesBufferedInAlignment == that.bytesBufferedInAlignment && + return bytesBufferedInAlignment == that.bytesBufferedInAlignment && alignmentDurationNanos == that.alignmentDurationNanos && syncDurationMillis == that.syncDurationMillis && asyncDurationMillis == that.asyncDurationMillis && diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java index 3bd124d..01e5f93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRetentionPolicy.java @@ -33,5 +33,5 @@ public enum CheckpointRetentionPolicy { RETAIN_ON_FAILURE, /** Checkpoints should always be cleaned up when an application reaches a terminal state. */ - NEVER_RETAIN_AFTER_TERMINATION; + NEVER_RETAIN_AFTER_TERMINATION } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java index 9db302c..2892c87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java @@ -92,8 +92,8 @@ public class CheckpointStatsHistory implements Serializable { false, maxSize, new AbstractCheckpointStats[0], - Collections.<AbstractCheckpointStats>emptyList(), - Collections.<Long, AbstractCheckpointStats>emptyMap(), + Collections.emptyList(), + Collections.emptyMap(), null, null, null); @@ -116,9 +116,9 @@ public class CheckpointStatsHistory implements Serializable { AbstractCheckpointStats[] checkpointArray, List<AbstractCheckpointStats> checkpointsHistory, Map<Long, AbstractCheckpointStats> checkpointsById, - CompletedCheckpointStats latestCompletedCheckpoint, - FailedCheckpointStats latestFailedCheckpoint, - CompletedCheckpointStats latestSavepoint) { + @Nullable CompletedCheckpointStats latestCompletedCheckpoint, + @Nullable FailedCheckpointStats latestFailedCheckpoint, + @Nullable CompletedCheckpointStats latestSavepoint) { this.readOnly = readOnly; checkArgument(maxSize >= 0, "Negative maximum size"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java index 7d787c2..aa20313 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java @@ -59,7 +59,7 @@ public class CheckpointStatsSnapshot implements Serializable { @Nullable RestoredCheckpointStats latestRestoredCheckpoint) { this.counts = checkNotNull(counts); - this.summary= checkNotNull(summary); + this.summary = checkNotNull(summary); this.history = checkNotNull(history); this.latestRestoredCheckpoint = latestRestoredCheckpoint; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index c7d51b6..42f608b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -107,7 +107,7 @@ public interface CompletedCheckpointStore { * This method returns whether the completed checkpoint store requires checkpoints to be * externalized. Externalized checkpoints have their meta data persisted, which the checkpoint * store can exploit (for example by simply pointing the persisted metadata). - * + * * @return True, if the store requires that checkpoints are externalized before being added, false * if the store stores the metadata itself. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java index 8004b32..4b275e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultLastStateConnectionStateListener.java @@ -25,6 +25,9 @@ import javax.annotation.Nullable; import java.util.Optional; +/** + * A simple ConnectionState listener that remembers the last state. + */ public class DefaultLastStateConnectionStateListener implements LastStateConnectionStateListener { @Nullable diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java index 2f596a7..1eb0131 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; + import java.util.Map; import static org.apache.flink.util.Preconditions.checkArgument; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java index a5f908d7..33eccf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java @@ -37,16 +37,16 @@ public class OperatorState implements CompositeStateHandle { private static final long serialVersionUID = -4845578005863201810L; - /** id of the operator */ + /** The id of the operator. */ private final OperatorID operatorID; - /** handles to non-partitioned states, subtaskindex -> subtaskstate */ + /** The handles to states created by the parallel tasks: subtaskIndex -> subtaskstate. */ private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates; - /** parallelism of the operator when it was checkpointed */ + /** The parallelism of the operator when it was checkpointed. */ private final int parallelism; - /** maximum parallelism of the operator when the job was first created */ + /** The maximum parallelism (for number of keygroups) of the operator when the job was first created. */ private final int maxParallelism; public OperatorState(OperatorID operatorID, int parallelism, int maxParallelism) { @@ -86,6 +86,10 @@ public class OperatorState implements CompositeStateHandle { } } + public Map<Integer, OperatorSubtaskState> getSubtaskStates() { + return Collections.unmodifiableMap(operatorSubtaskStates); + } + public Collection<OperatorSubtaskState> getStates() { return operatorSubtaskStates.values(); } @@ -148,10 +152,6 @@ public class OperatorState implements CompositeStateHandle { return parallelism + 31 * Objects.hash(operatorID, operatorSubtaskStates); } - public Map<Integer, OperatorSubtaskState> getSubtaskStates() { - return Collections.unmodifiableMap(operatorSubtaskStates); - } - @Override public String toString() { // KvStates are always null in 1.1. Don't print this as it might diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index c2b8e9a..87ab061 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.CompositeStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java index 9b21747..323979d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; + import java.util.HashMap; import java.util.Map; @@ -45,7 +46,7 @@ public class PendingCheckpointStats extends AbstractCheckpointStats { private static final long serialVersionUID = -973959257699390327L; /** Tracker callback when the pending checkpoint is finalized or aborted. */ - private transient final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback; + private final transient CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback; /** The current number of acknowledged subtasks. */ private volatile int currentNumAcknowledgedSubtasks; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java index 52b30a1..0a949ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.commons.lang3.BooleanUtils; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateObject; +import org.apache.commons.lang3.BooleanUtils; + import javax.annotation.Nonnull; import java.util.ArrayList; @@ -162,7 +163,6 @@ public class PrioritizedOperatorSubtaskState { return restored; } - private static <T extends StateObject> StateObjectCollection<T> lastElement(List<StateObjectCollection<T>> list) { return list.get(list.size() - 1); } @@ -174,6 +174,9 @@ public class PrioritizedOperatorSubtaskState { return EMPTY_NON_RESTORED_INSTANCE; } + /** + * A builder for PrioritizedOperatorSubtaskState. + */ @Internal public static class Builder { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java index ac7a9d8..8e7b355 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java @@ -388,7 +388,7 @@ public class StateAssignmentOperation { /** * Collect {@link KeyGroupsStateHandle managedKeyedStateHandles} which have intersection with given - * {@link KeyGroupRange} from {@link TaskState operatorState} + * {@link KeyGroupRange} from {@link TaskState operatorState}. * * @param operatorState all state handles of a operator * @param subtaskKeyGroupRange the KeyGroupRange of a subtask @@ -423,7 +423,7 @@ public class StateAssignmentOperation { /** * Collect {@link KeyGroupsStateHandle rawKeyedStateHandles} which have intersection with given - * {@link KeyGroupRange} from {@link TaskState operatorState} + * {@link KeyGroupRange} from {@link TaskState operatorState}. * * @param operatorState all state handles of a operator * @param subtaskKeyGroupRange the KeyGroupRange of a subtask @@ -481,8 +481,8 @@ public class StateAssignmentOperation { * Groups the available set of key groups into key group partitions. A key group partition is * the set of key groups which is assigned to the same task. Each set of the returned list * constitutes a key group partition. - * <p> - * <b>IMPORTANT</b>: The assignment of key groups to partitions has to be in sync with the + * + * <p><b>IMPORTANT</b>: The assignment of key groups to partitions has to be in sync with the * KeyGroupStreamPartitioner. * * @param numberKeyGroups Number of available key groups (indexed from 0 to numberKeyGroups - 1) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java index 3076847..9d4c32b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java @@ -191,6 +191,7 @@ public class StateObjectCollection<T extends StateObject> implements Collection< // Helper methods. // ------------------------------------------------------------------------ + @SuppressWarnings("unchecked") public static <T extends StateObject> StateObjectCollection<T> empty() { return (StateObjectCollection<T>) EMPTY; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java index 5aab33a..1407fc4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java @@ -20,8 +20,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CompositeStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; @@ -90,7 +90,7 @@ public class SubtaskState implements CompositeStateHandle { } } - private static final long getSizeNullSafe(StateObject stateObject) throws Exception { + private static long getSizeNullSafe(StateObject stateObject) throws Exception { return stateObject != null ? stateObject.getStateSize() : 0L; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index 0f3bedb..e2c0bf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -33,28 +33,27 @@ import java.util.Objects; * Simple container class which contains the task state and key-group state handles for the sub * tasks of a {@link org.apache.flink.runtime.jobgraph.JobVertex}. * - * This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together. + * <p>This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together. * * @deprecated Internal class for savepoint backwards compatibility. Don't use for other purposes. */ @Deprecated -@SuppressWarnings("deprecation") public class TaskState implements CompositeStateHandle { private static final long serialVersionUID = -4845578005863201810L; private final JobVertexID jobVertexID; - /** handles to non-partitioned states, subtaskindex -> subtaskstate */ + /** handles to non-partitioned states, subtaskindex -> subtaskstate. */ private final Map<Integer, SubtaskState> subtaskStates; - /** parallelism of the operator when it was checkpointed */ + /** parallelism of the operator when it was checkpointed. */ private final int parallelism; - /** maximum parallelism of the operator when the job was first created */ + /** maximum parallelism of the operator when the job was first created. */ private final int maxParallelism; - /** length of the operator chain */ + /** length of the operator chain. */ private final int chainLength; public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism, int chainLength) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java index 084d7dd..b1e2f99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import javax.annotation.Nullable; + import java.io.Serializable; import static org.apache.flink.util.Preconditions.checkArgument; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 069cce5..4311b68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.curator.framework.CuratorFramework; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.curator.framework.CuratorFramework; + import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 10083fc..b4f72b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -169,7 +169,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) { - CompletedCheckpoint completedCheckpoint = null; + CompletedCheckpoint completedCheckpoint; try { completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); @@ -245,8 +245,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto @Override public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { - List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints); - return checkpoints; + return new ArrayList<>(completedCheckpoints); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 3a16900..e9ffbe3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -58,7 +58,7 @@ public class MasterHooks { */ public static void reset( final Collection<MasterTriggerRestoreHook<?>> hooks, - final Logger log) throws FlinkException { + @SuppressWarnings("unused") final Logger log) throws FlinkException { for (MasterTriggerRestoreHook<?> hook : hooks) { final String id = hook.getIdentifier(); @@ -76,12 +76,10 @@ public class MasterHooks { * Closes the master hooks. * * @param hooks The hooks to close - * - * @throws FlinkException Thrown, if the hooks throw an exception. */ public static void close( final Collection<MasterTriggerRestoreHook<?>> hooks, - final Logger log) throws FlinkException { + final Logger log) { for (MasterTriggerRestoreHook<?> hook : hooks) { try { @@ -320,12 +318,7 @@ public class MasterHooks { @Nullable @Override public CompletableFuture<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception { - final Executor wrappedExecutor = new Executor() { - @Override - public void execute(Runnable command) { - executor.execute(new WrappedCommand(userClassLoader, command)); - } - }; + final Executor wrappedExecutor = command -> executor.execute(new WrappedCommand(userClassLoader, command)); return LambdaUtil.withContextClassLoader( userClassLoader, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java index 12e9c5b..25b63b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java @@ -29,8 +29,8 @@ import java.util.Map; */ public class SavepointSerializers { - /** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format) */ - static boolean FAIL_WHEN_LEGACY_STATE_DETECTED = true; + /** If this flag is true, restoring a savepoint fails if it contains legacy state (<= Flink 1.1 format). */ + static boolean failWhenLegacyStateDetected = true; private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(2); @@ -87,6 +87,6 @@ public class SavepointSerializers { */ @VisibleForTesting public static void setFailWhenLegacyStateDetected(boolean fail) { - FAIL_WHEN_LEGACY_STATE_DETECTED = fail; + failWhenLegacyStateDetected = fail; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java index daf5b7f..69e7695 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java @@ -35,10 +35,10 @@ public class SavepointV1 implements Savepoint { /** The savepoint version. */ public static final int VERSION = 1; - /** The checkpoint ID */ + /** The checkpoint ID. */ private final long checkpointId; - /** The task states */ + /** The task states. */ private final Collection<TaskState> taskStates; public SavepointV1(long checkpointId, Collection<TaskState> taskStates) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java index b3e6e89..e56d4be 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java @@ -25,11 +25,11 @@ import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -53,6 +53,7 @@ import java.util.Map; * classes to stay the same. */ @Internal +@SuppressWarnings("deprecation") public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { private static final byte NULL_HANDLE = 0; @@ -164,11 +165,11 @@ public class SavepointV1Serializer implements SavepointSerializer<SavepointV2> { private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException { // Duration field has been removed from SubtaskState - long ignoredDuration = dis.readLong(); + dis.readLong(); int len = dis.readInt(); - if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) { + if (SavepointSerializers.failWhenLegacyStateDetected) { Preconditions.checkState(len == 0, "Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " + "no longer supported starting from Flink 1.4. Please rewrite your job to use " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java index 466917b..6dc628d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java @@ -27,8 +27,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.util.Preconditions; import java.util.Collection; @@ -48,27 +48,27 @@ public class SavepointV2 implements Savepoint { /** The savepoint version. */ public static final int VERSION = 2; - /** The checkpoint ID */ + /** The checkpoint ID. */ private final long checkpointId; /** - * The task states - * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. + * The task states. + * @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. */ @Deprecated private final Collection<TaskState> taskStates; - /** The operator states */ + /** The operator states. */ private final Collection<OperatorState> operatorStates; - /** The states generated by the CheckpointCoordinator */ + /** The states generated by the CheckpointCoordinator. */ private final Collection<MasterState> masterStates; /** @deprecated Only kept for backwards-compatibility with versions < 1.3. Will be removed in the future. */ @Deprecated public SavepointV2(long checkpointId, Collection<TaskState> taskStates) { this( - checkpointId, + checkpointId, null, checkNotNull(taskStates, "taskStates"), Collections.<MasterState>emptyList() @@ -180,13 +180,13 @@ public class SavepointV2 implements Savepoint { Preconditions.checkArgument( jobVertex.getParallelism() == taskState.getParallelism(), - "Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() +"." + + "Detected change in parallelism during migration for task " + jobVertex.getJobVertexId() + "." + "When migrating a savepoint from a version < 1.3 please make sure that no changes were made " + "to the parallelism of stateful operators."); Preconditions.checkArgument( operatorIDs.size() == taskState.getChainLength(), - "Detected change in chain length during migration for task " + jobVertex.getJobVertexId() +". " + + "Detected change in chain length during migration for task " + jobVertex.getJobVertexId() + ". " + "When migrating a savepoint from a version < 1.3 please make sure that the topology was not " + "changed by modification of a chain containing a stateful operator."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index fb942f7..593bcf6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -26,11 +26,11 @@ import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; @@ -54,15 +54,15 @@ import java.util.UUID; /** * (De)serializer for checkpoint metadata format version 2. - * + * * <p>This format version adds - * + * * <p>Basic checkpoint metadata layout: * <pre> * +--------------+---------------+-----------------+ * | checkpointID | master states | operator states | * +--------------+---------------+-----------------+ - * + * * Master state: * +--------------+---------------------+---------+------+---------------+ * | magic number | num remaining bytes | version | name | payload bytes | @@ -73,7 +73,7 @@ import java.util.UUID; @VisibleForTesting public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { - /** Random magic number for consistency checks */ + /** Random magic number for consistency checks. */ private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696; private static final byte NULL_HANDLE = 0; @@ -83,12 +83,12 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4; private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5; - /** The singleton instance of the serializer */ + /** The singleton instance of the serializer. */ public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer(); // ------------------------------------------------------------------------ - /** Singleton, not meant to be instantiated */ + /** Singleton, not meant to be instantiated. */ private SavepointV2Serializer() {} // ------------------------------------------------------------------------ @@ -295,7 +295,7 @@ public class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { // for compatibility, do not remove int len = dis.readInt(); - if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) { + if (SavepointSerializers.failWhenLegacyStateDetected) { Preconditions.checkState(len == 0, "Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is " + "no longer supported starting from Flink 1.4. Please rewrite your job to use " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index 7de66ef..05d20dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -131,7 +131,7 @@ public class ByteStreamStateHandle implements StreamStateHandle { @Override public int read(byte[] b, int off, int len) throws IOException { - // note that any bounds checking on "byte[] b" happend anyways by the + // note that any bounds checking on "byte[] b" happened anyways by the // System.arraycopy() call below, so we don't add extra checks here final int bytesLeft = data.length - index; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java index 602390b..abe1ae1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; + import org.junit.Test; import java.io.DataInputStream; @@ -37,7 +38,7 @@ import java.util.Random; import static org.junit.Assert.assertEquals; /** - * Various tests for the version 2 format serializer of a checkpoint. + * Various tests for the version 2 format serializer of a checkpoint. */ public class SavepointV2SerializerTest { @@ -65,7 +66,7 @@ public class SavepointV2SerializerTest { final Collection<OperatorState> operatorStates = Collections.emptyList(); final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1; - final Collection<MasterState> masterStates = + final Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates); testCheckpointSerialization(checkpointId, operatorStates, masterStates); @@ -83,7 +84,7 @@ public class SavepointV2SerializerTest { final int numTasks = rnd.nextInt(maxTaskStates) + 1; final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1; - final Collection<OperatorState> taskStates = + final Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, numTasks, numSubtasks); final Collection<MasterState> masterStates = Collections.emptyList(); @@ -139,8 +140,7 @@ public class SavepointV2SerializerTest { assertEquals(masterStates.size(), deserialized.getMasterStates().size()); for (Iterator<MasterState> a = masterStates.iterator(), b = deserialized.getMasterStates().iterator(); - a.hasNext();) - { + a.hasNext();) { CheckpointTestUtils.assertMasterStateEquality(a.next(), b.next()); } } diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index cf985a9..87e3319 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -24,12 +24,8 @@ under the License. <suppressions> <suppress - files="(.*)runtime[/\\]checkpoint[/\\](.*)" - checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...] - <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory--> - <suppress files="(.*)test[/\\](.*)runtime[/\\]checkpoint[/\\](.*)" - checks="AvoidStarImport|FileLength|UnusedImports|NeedBraces"/> + checks="AvoidStarImport|FileLength|UnusedImports|NeedBraces|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|Whites [...] <suppress files="(.*)runtime[/\\]client[/\\](.*)" checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhite [...]
