This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 273108a4e3672f9eea772ae9444bcb98aa46c396 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Tue Dec 28 17:36:00 2021 +0100 [hotfix] Simplify TaskLocalStateStoreImpl by removing testing constructor --- .../runtime/state/TaskLocalStateStoreImpl.java | 27 ++-------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index 87f4e0c..eef6918 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -87,7 +87,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { @Nonnull private final Executor discardExecutor; /** Lock for synchronisation on the storage map and the discarded status. */ - @Nonnull private final Object lock; + @Nonnull private final Object lock = new Object(); /** Status flag if this store was already discarded. */ @GuardedBy("lock") @@ -106,36 +106,13 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor discardExecutor) { - this( - jobID, - allocationID, - jobVertexID, - subtaskIndex, - localRecoveryConfig, - discardExecutor, - new TreeMap<>(), - new Object()); - } - - @VisibleForTesting - TaskLocalStateStoreImpl( - @Nonnull JobID jobID, - @Nonnull AllocationID allocationID, - @Nonnull JobVertexID jobVertexID, - @Nonnegative int subtaskIndex, - @Nonnull LocalRecoveryConfig localRecoveryConfig, - @Nonnull Executor discardExecutor, - @Nonnull SortedMap<Long, TaskStateSnapshot> storedTaskStateByCheckpointID, - @Nonnull Object lock) { - this.jobID = jobID; this.allocationID = allocationID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; this.discardExecutor = discardExecutor; this.localRecoveryConfig = localRecoveryConfig; - this.storedTaskStateByCheckpointID = storedTaskStateByCheckpointID; - this.lock = lock; + this.storedTaskStateByCheckpointID = new TreeMap<>(); this.disposed = false; }