This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 59141399bb2eff1e40eacf80decffc743f60cb36 Author: Till Rohrmann <[email protected]> AuthorDate: Thu Nov 18 10:01:13 2021 +0100 [hotfix][tests] Remove Mocking from TaskLocalStateStoreImplTest Mockito's spy prevented TaskLocalStateStoreImplTest from succeeding because a mock cannot be serialized. --- .../runtime/state/TaskLocalStateStoreImplTest.java | 55 ++++++++++++++-------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java index 9af844b..ce57073 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; import org.junit.After; @@ -31,7 +32,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.mockito.Mockito; import java.io.File; import java.util.ArrayList; @@ -39,9 +39,11 @@ import java.util.List; import java.util.SortedMap; import java.util.TreeMap; -import static org.powermock.api.mockito.PowerMockito.spy; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; -public class TaskLocalStateStoreImplTest { +/** Test for the {@link TaskLocalStateStoreImpl}. */ +public class TaskLocalStateStoreImplTest extends TestLogger { private SortedMap<Long, TaskStateSnapshot> internalSnapshotMap; private Object internalLock; @@ -113,7 +115,7 @@ public class TaskLocalStateStoreImplTest { Assert.assertNull(taskLocalStateStore.retrieveLocalState(i)); } - List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); checkStoredAsExpected(taskStateSnapshots, 0, chkCount); @@ -126,7 +128,7 @@ public class TaskLocalStateStoreImplTest { final int chkCount = 3; - List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); // test retrieve with pruning taskLocalStateStore.pruneMatchingCheckpoints((long chk) -> chk != chkCount - 1); @@ -144,7 +146,7 @@ public class TaskLocalStateStoreImplTest { final int chkCount = 3; final int confirmed = chkCount - 1; - List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); taskLocalStateStore.confirmCheckpoint(confirmed); checkPrunedAndDiscarded(taskStateSnapshots, 0, confirmed); checkStoredAsExpected(taskStateSnapshots, confirmed, chkCount); @@ -156,7 +158,7 @@ public class TaskLocalStateStoreImplTest { final int chkCount = 4; final int aborted = chkCount - 2; - List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); taskLocalStateStore.abortCheckpoint(aborted); checkPrunedAndDiscarded(taskStateSnapshots, aborted, aborted + 1); checkStoredAsExpected(taskStateSnapshots, 0, aborted); @@ -170,35 +172,34 @@ public class TaskLocalStateStoreImplTest { public void dispose() throws Exception { final int chkCount = 3; final int confirmed = chkCount - 1; - List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + List<TestingTaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); taskLocalStateStore.confirmCheckpoint(confirmed); taskLocalStateStore.dispose(); checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount); } - private void checkStoredAsExpected(List<TaskStateSnapshot> history, int start, int end) - throws Exception { + private void checkStoredAsExpected(List<TestingTaskStateSnapshot> history, int start, int end) { for (int i = start; i < end; ++i) { - TaskStateSnapshot expected = history.get(i); - Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); - Mockito.verify(expected, Mockito.never()).discardState(); + TestingTaskStateSnapshot expected = history.get(i); + assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); + assertFalse(expected.isDiscarded()); } } - private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int start, int end) - throws Exception { + private void checkPrunedAndDiscarded( + List<TestingTaskStateSnapshot> history, int start, int end) { for (int i = start; i < end; ++i) { Assert.assertNull(taskLocalStateStore.retrieveLocalState(i)); - Mockito.verify(history.get(i)).discardState(); + assertTrue(history.get(i).isDiscarded()); } } - private List<TaskStateSnapshot> storeStates(int count) { - List<TaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count); + private List<TestingTaskStateSnapshot> storeStates(int count) { + List<TestingTaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count); for (int i = 0; i < count; ++i) { OperatorID operatorID = new OperatorID(); - TaskStateSnapshot taskStateSnapshot = spy(new TaskStateSnapshot()); + TestingTaskStateSnapshot taskStateSnapshot = new TestingTaskStateSnapshot(); OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder().build(); taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); taskLocalStateStore.storeLocalState(i, taskStateSnapshot); @@ -206,4 +207,20 @@ public class TaskLocalStateStoreImplTest { } return taskStateSnapshots; } + + private static final class TestingTaskStateSnapshot extends TaskStateSnapshot { + private static final long serialVersionUID = 2046321877379917040L; + + private boolean isDiscarded = false; + + @Override + public void discardState() throws Exception { + super.discardState(); + isDiscarded = true; + } + + boolean isDiscarded() { + return isDiscarded; + } + } }
