This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 60fb96cccceffdad4ba0e7c7db43e623c75a93a9 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Feb 19 16:27:23 2021 +0100 [hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove Additionally, remove dead code and check that an exception was thrown. --- .../StandaloneCompletedCheckpointStoreTest.java | 47 +++++++------- ...oKeeperCompletedCheckpointStoreMockitoTest.java | 71 ---------------------- .../ZooKeeperCompletedCheckpointStoreTest.java | 44 ++++++++++++++ 3 files changed, 68 insertions(+), 94 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index becc973..8e3635c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -22,23 +22,20 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.Test; -import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.powermock.api.mockito.PowerMockito.doReturn; -import static org.powermock.api.mockito.PowerMockito.doThrow; -import static org.powermock.api.mockito.PowerMockito.mock; /** Tests for basic {@link CompletedCheckpointStore} contract. */ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest { @@ -100,24 +97,28 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain, Executors.directExecutor()); - for (long i = 0; i <= numCheckpointsToRetain; ++i) { - CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); - doReturn(i).when(checkpointToAdd).getCheckpointID(); - doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates(); - doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(); - - try { - store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {}); - - // The checkpoint should be in the store if we successfully add it into the store. - List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints(); - assertTrue(addedCheckpoints.contains(checkpointToAdd)); - } catch (Exception e) { - // The checkpoint should not be in the store if any exception is thrown. - List<CompletedCheckpoint> addedCheckpoints = store.getAllCheckpoints(); - assertFalse(addedCheckpoints.contains(checkpointToAdd)); - } + CountDownLatch discardAttempted = new CountDownLatch(1); + for (long i = 0; i < numCheckpointsToRetain + 1; ++i) { + CompletedCheckpoint checkpointToAdd = + new CompletedCheckpoint( + new JobID(), + i, + i, + i, + Collections.emptyMap(), + Collections.emptyList(), + CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), + new TestCompletedCheckpointStorageLocation()) { + @Override + public boolean discardOnSubsume() { + discardAttempted.countDown(); + throw new RuntimeException(); + } + }; + // should fail despite the exception + store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () -> {}); } + discardAttempted.await(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index 8b7c9fc..af8c091 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -42,20 +42,15 @@ import org.mockito.stubbing.Answer; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -363,70 +358,4 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest extends TestLogger { // are subsumed should they be discarded. verify(failingRetrievableStateHandle, never()).discardState(); } - - /** - * Tests that the checkpoint does not exist in the store when we fail to add it into the store - * (i.e., there exists an exception thrown by the method). - */ - @Test - public void testAddCheckpointWithFailedRemove() throws Exception { - final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); - final RetrievableStateStorageHelper<CompletedCheckpoint> storageHelperMock = - mock(RetrievableStateStorageHelper.class); - - ZooKeeperStateHandleStore<CompletedCheckpoint> zookeeperStateHandleStoreMock = - spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock)); - - doAnswer( - new Answer<RetrievableStateHandle<CompletedCheckpoint>>() { - @Override - public RetrievableStateHandle<CompletedCheckpoint> answer( - InvocationOnMock invocationOnMock) throws Throwable { - CompletedCheckpoint checkpoint = - (CompletedCheckpoint) invocationOnMock.getArguments()[1]; - - RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle = - mock(RetrievableStateHandle.class); - when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint); - - return retrievableStateHandle; - } - }) - .when(zookeeperStateHandleStoreMock) - .addAndLock(anyString(), any(CompletedCheckpoint.class)); - - doThrow(new Exception()) - .when(zookeeperStateHandleStoreMock) - .releaseAndTryRemove(anyString()); - - final int numCheckpointsToRetain = 1; - - CompletedCheckpointStore zooKeeperCompletedCheckpointStore = - new DefaultCompletedCheckpointStore<>( - numCheckpointsToRetain, - zookeeperStateHandleStoreMock, - zooKeeperCheckpointStoreUtil, - Executors.directExecutor()); - - for (long i = 0; i <= numCheckpointsToRetain; ++i) { - CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class); - doReturn(i).when(checkpointToAdd).getCheckpointID(); - doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates(); - - try { - zooKeeperCompletedCheckpointStore.addCheckpoint( - checkpointToAdd, new CheckpointsCleaner(), () -> {}); - - // The checkpoint should be in the store if we successfully add it into the store. - List<CompletedCheckpoint> addedCheckpoints = - zooKeeperCompletedCheckpointStore.getAllCheckpoints(); - assertTrue(addedCheckpoints.contains(checkpointToAdd)); - } catch (Exception e) { - // The checkpoint should not be in the store if any exception is thrown. - List<CompletedCheckpoint> addedCheckpoints = - zooKeeperCompletedCheckpointStore.getAllCheckpoints(); - assertFalse(addedCheckpoints.contains(checkpointToAdd)); - } - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 6b4e682..7e417fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; @@ -26,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperResource; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; @@ -42,10 +44,13 @@ import org.junit.Test; import javax.annotation.Nonnull; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.stream.IntStream; +import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; import static org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.rethrow; @@ -301,4 +306,43 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { return 0; } } + + /** + * Tests that the checkpoint does not exist in the store when we fail to add it into the store + * (i.e., there exists an exception thrown by the method). + */ + @Test + public void testAddCheckpointWithFailedRemove() throws Exception { + + final int numCheckpointsToRetain = 1; + final Configuration configuration = new Configuration(); + configuration.setString( + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString()); + + final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); + final CompletedCheckpointStore store = createZooKeeperCheckpointStore(client); + + CountDownLatch discardAttempted = new CountDownLatch(1); + for (long i = 0; i < numCheckpointsToRetain + 1; ++i) { + CompletedCheckpoint checkpointToAdd = + new CompletedCheckpoint( + new JobID(), + i, + i, + i, + Collections.emptyMap(), + Collections.emptyList(), + CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION), + new TestCompletedCheckpointStorageLocation()); + // shouldn't fail despite the exception + store.addCheckpoint( + checkpointToAdd, + new CheckpointsCleaner(), + () -> { + discardAttempted.countDown(); + throw new RuntimeException(); + }); + } + discardAttempted.await(); + } }
