XComp commented on a change in pull request #19191:
URL: https://github.com/apache/flink/pull/19191#discussion_r831241857



##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
##########
@@ -50,6 +55,126 @@ public void testGetAndIncrement() throws Exception {
         };
     }
 
+    @Test
+    public void testShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            
checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(true));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testIdempotentShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+
+                            
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownFailureDueToMissingConfigMap() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            
flinkKubeClient.deleteConfigMap(LEADER_CONFIGMAP_NAME);
+
+                            assertThrows(
+                                    CompletionException.class,
+                                    () -> 
checkpointIDCounter.shutdown(JobStatus.FINISHED).get());

Review comment:
       Deleting the `ConfigMap` is a tool for me to create an inconsistent 
state, that, as a consequence, causes failures accessing it. I want to check 
that we do proper failure handling here, i.e. propagating the actual failure 
instead of just "swallowing" it (like we did before; as far as I understand).
   
   It's not ok to have the `ConfigMap` being removed in the general case. It's 
just the only way, I could think of, to generate an error on the ZK side. ...as 
an alternative to mocking the curator-internal ZK call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to