This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60fb96cccceffdad4ba0e7c7db43e623c75a93a9
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Feb 19 16:27:23 2021 +0100

    [hotfix][tests] Remove mock from testAddCheckpointWithFailedRemove
    
    Additionally, remove dead code and check that an
    exception was thrown.
---
 .../StandaloneCompletedCheckpointStoreTest.java    | 47 +++++++-------
 ...oKeeperCompletedCheckpointStoreMockitoTest.java | 71 ----------------------
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 44 ++++++++++++++
 3 files changed, 68 insertions(+), 94 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index becc973..8e3635c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -22,23 +22,20 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mock;
 
 /** Tests for basic {@link CompletedCheckpointStore} contract. */
 public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointStoreTest {
@@ -100,24 +97,28 @@ public class StandaloneCompletedCheckpointStoreTest 
extends CompletedCheckpointS
         CompletedCheckpointStore store =
                 createCompletedCheckpoints(numCheckpointsToRetain, 
Executors.directExecutor());
 
-        for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-            CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
-            doReturn(i).when(checkpointToAdd).getCheckpointID();
-            
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-            doThrow(new 
IOException()).when(checkpointToAdd).discardOnSubsume();
-
-            try {
-                store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), 
() -> {});
-
-                // The checkpoint should be in the store if we successfully 
add it into the store.
-                List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
-                assertTrue(addedCheckpoints.contains(checkpointToAdd));
-            } catch (Exception e) {
-                // The checkpoint should not be in the store if any exception 
is thrown.
-                List<CompletedCheckpoint> addedCheckpoints = 
store.getAllCheckpoints();
-                assertFalse(addedCheckpoints.contains(checkpointToAdd));
-            }
+        CountDownLatch discardAttempted = new CountDownLatch(1);
+        for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
+            CompletedCheckpoint checkpointToAdd =
+                    new CompletedCheckpoint(
+                            new JobID(),
+                            i,
+                            i,
+                            i,
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                            new TestCompletedCheckpointStorageLocation()) {
+                        @Override
+                        public boolean discardOnSubsume() {
+                            discardAttempted.countDown();
+                            throw new RuntimeException();
+                        }
+                    };
+            // should fail despite the exception
+            store.addCheckpoint(checkpointToAdd, new CheckpointsCleaner(), () 
-> {});
         }
+        discardAttempted.await();
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 8b7c9fc..af8c091 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -42,20 +42,15 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -363,70 +358,4 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
         // are subsumed should they be discarded.
         verify(failingRetrievableStateHandle, never()).discardState();
     }
-
-    /**
-     * Tests that the checkpoint does not exist in the store when we fail to 
add it into the store
-     * (i.e., there exists an exception thrown by the method).
-     */
-    @Test
-    public void testAddCheckpointWithFailedRemove() throws Exception {
-        final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
-        final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock =
-                mock(RetrievableStateStorageHelper.class);
-
-        ZooKeeperStateHandleStore<CompletedCheckpoint> 
zookeeperStateHandleStoreMock =
-                spy(new ZooKeeperStateHandleStore<>(client, 
storageHelperMock));
-
-        doAnswer(
-                        new 
Answer<RetrievableStateHandle<CompletedCheckpoint>>() {
-                            @Override
-                            public RetrievableStateHandle<CompletedCheckpoint> 
answer(
-                                    InvocationOnMock invocationOnMock) throws 
Throwable {
-                                CompletedCheckpoint checkpoint =
-                                        (CompletedCheckpoint) 
invocationOnMock.getArguments()[1];
-
-                                RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle =
-                                        mock(RetrievableStateHandle.class);
-                                
when(retrievableStateHandle.retrieveState()).thenReturn(checkpoint);
-
-                                return retrievableStateHandle;
-                            }
-                        })
-                .when(zookeeperStateHandleStoreMock)
-                .addAndLock(anyString(), any(CompletedCheckpoint.class));
-
-        doThrow(new Exception())
-                .when(zookeeperStateHandleStoreMock)
-                .releaseAndTryRemove(anyString());
-
-        final int numCheckpointsToRetain = 1;
-
-        CompletedCheckpointStore zooKeeperCompletedCheckpointStore =
-                new DefaultCompletedCheckpointStore<>(
-                        numCheckpointsToRetain,
-                        zookeeperStateHandleStoreMock,
-                        zooKeeperCheckpointStoreUtil,
-                        Executors.directExecutor());
-
-        for (long i = 0; i <= numCheckpointsToRetain; ++i) {
-            CompletedCheckpoint checkpointToAdd = 
mock(CompletedCheckpoint.class);
-            doReturn(i).when(checkpointToAdd).getCheckpointID();
-            
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
-
-            try {
-                zooKeeperCompletedCheckpointStore.addCheckpoint(
-                        checkpointToAdd, new CheckpointsCleaner(), () -> {});
-
-                // The checkpoint should be in the store if we successfully 
add it into the store.
-                List<CompletedCheckpoint> addedCheckpoints =
-                        zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                assertTrue(addedCheckpoints.contains(checkpointToAdd));
-            } catch (Exception e) {
-                // The checkpoint should not be in the store if any exception 
is thrown.
-                List<CompletedCheckpoint> addedCheckpoints =
-                        zooKeeperCompletedCheckpointStore.getAllCheckpoints();
-                assertFalse(addedCheckpoints.contains(checkpointToAdd));
-            }
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 6b4e682..7e417fc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
@@ -42,10 +44,13 @@ import org.junit.Test;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
@@ -301,4 +306,43 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
             return 0;
         }
     }
+
+    /**
+     * Tests that the checkpoint does not exist in the store when we fail to 
add it into the store
+     * (i.e., there exists an exception thrown by the method).
+     */
+    @Test
+    public void testAddCheckpointWithFailedRemove() throws Exception {
+
+        final int numCheckpointsToRetain = 1;
+        final Configuration configuration = new Configuration();
+        configuration.setString(
+                HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
+
+        final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+        final CompletedCheckpointStore store = 
createZooKeeperCheckpointStore(client);
+
+        CountDownLatch discardAttempted = new CountDownLatch(1);
+        for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
+            CompletedCheckpoint checkpointToAdd =
+                    new CompletedCheckpoint(
+                            new JobID(),
+                            i,
+                            i,
+                            i,
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            
CheckpointProperties.forCheckpoint(NEVER_RETAIN_AFTER_TERMINATION),
+                            new TestCompletedCheckpointStorageLocation());
+            // shouldn't fail despite the exception
+            store.addCheckpoint(
+                    checkpointToAdd,
+                    new CheckpointsCleaner(),
+                    () -> {
+                        discardAttempted.countDown();
+                        throw new RuntimeException();
+                    });
+        }
+        discardAttempted.await();
+    }
 }

Reply via email to