zentol commented on a change in pull request #19191:
URL: https://github.com/apache/flink/pull/19191#discussion_r831381056



##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
##########
@@ -50,6 +55,131 @@ public void testGetAndIncrement() throws Exception {
         };
     }
 
+    @Test
+    public void testShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            
checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(true));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testIdempotentShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+
+                            
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownFailureDueToMissingConfigMap() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            // deleting the ConfigMap from outside of the 
CheckpointIDCounter while
+                            // still using the counter (which is stored as an 
entry in the
+                            // ConfigMap) causes an unexpected failure which 
we want to simulate
+                            // here
+                            
flinkKubeClient.deleteConfigMap(LEADER_CONFIGMAP_NAME);
+
+                            assertThrows(
+                                    "The shutdown should fail due to the 
illegal state of the test setup.",

Review comment:
       this message doesn't really provide any value. That some part of the 
test setup is supposed to make this fail is obvious.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounterITCase.java
##########
@@ -62,10 +69,70 @@ public void testShutdownRemovesState() throws Exception {
         CuratorFramework client = zookeeper.getClient();
         
assertThat(client.checkExists().forPath(counter.getPath())).isNotNull();
 
-        counter.shutdown(JobStatus.FINISHED);
+        counter.shutdown(JobStatus.FINISHED).join();
+        assertThat(client.checkExists().forPath(counter.getPath())).isNull();
+    }
+
+    @Test
+    public void testIdempotentShutdown() throws Exception {
+        ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+        counter.start();
+
+        CuratorFramework client = zookeeper.getClient();
+        counter.shutdown(JobStatus.FINISHED).join();
+
+        // shutdown shouldn't fail due to missing path
+        counter.shutdown(JobStatus.FINISHED).join();
         assertThat(client.checkExists().forPath(counter.getPath())).isNull();
     }
 
+    @Test
+    public void testShutdownWithFailureDueToMissingConnection() throws 
Exception {
+        ZooKeeperCheckpointIDCounter counter = createCheckpointIdCounter();
+        counter.start();
+
+        cleanAndStopZooKeeperIfRunning();
+
+        assertThat(counter.shutdown(JobStatus.FINISHED))
+                .as("The shutdown should fail because of the client connection 
being dropped.")
+                .failsWithin(Duration.ofHours(1));

Review comment:
       Not really a fan of using such dummy timeouts. I'd rather call get() on 
the future and simplify the whole thing. We shouldn't force ourselves to use 
the CompletableFuture assertions if they are incompatible with our testing 
philosophy.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
##########
@@ -101,22 +110,66 @@ public void start() throws Exception {
     }
 
     @Override
-    public void shutdown(JobStatus jobStatus) throws Exception {
+    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
         synchronized (startStopLock) {
             if (isStarted) {
                 LOG.info("Shutting down.");
-                sharedCount.close();
+                try {
+                    sharedCount.close();
+                } catch (IOException e) {
+                    return FutureUtils.completedExceptionally(e);
+                }
 
                 
client.getConnectionStateListenable().removeListener(connectionStateListener);
 
                 if (jobStatus.isGloballyTerminalState()) {
                     LOG.info("Removing {} from ZooKeeper", counterPath);
-                    
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+                    final CompletableFuture<Void> deletionFuture = new 
CompletableFuture<>();
+                    try {
+                        client.delete()
+                                .inBackground(
+                                        (curatorFramework, curatorEvent) ->
+                                                handleDeletionOfCounterPath(
+                                                        curatorEvent, 
deletionFuture))
+                                .forPath(counterPath);

Review comment:
       could create and return the deletion future within the try block

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounterTest.java
##########
@@ -50,6 +55,131 @@ public void testGetAndIncrement() throws Exception {
         };
     }
 
+    @Test
+    public void testShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            
checkpointIDCounter.shutdown(JobStatus.FINISHED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testShutdownForLocallyTerminatedJobStatus() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            checkpointIDCounter.setCount(100L);
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.get(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is("100"));
+
+                            
checkpointIDCounter.shutdown(JobStatus.SUSPENDED).join();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(true));
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testIdempotentShutdown() throws Exception {
+        new Context() {
+            {
+                runTest(
+                        () -> {
+                            leaderCallbackGrantLeadership();
+
+                            final KubernetesCheckpointIDCounter 
checkpointIDCounter =
+                                    new KubernetesCheckpointIDCounter(
+                                            flinkKubeClient, 
LEADER_CONFIGMAP_NAME, LOCK_IDENTITY);
+                            checkpointIDCounter.start();
+
+                            assertThat(
+                                    getLeaderConfigMap()
+                                            .getData()
+                                            
.containsKey(Constants.CHECKPOINT_COUNTER_KEY),
+                                    is(false));

Review comment:
       I'd prefer this to follow the pattern in the ZK test where we start out 
with data in the config map, and then initiate the shutdown twice. As is the 
shutdown method could fail catastrophically if called twice in a row, rendering 
it non-idempotent for our purposes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to