XComp commented on a change in pull request #19191:
URL: https://github.com/apache/flink/pull/19191#discussion_r831279324



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
##########
@@ -118,11 +122,46 @@ public void start() throws Exception {
 
                 if (jobStatus.isGloballyTerminalState()) {
                     LOG.info("Removing {} from ZooKeeper", counterPath);
+                    final CompletableFuture<Void> deletionFuture = new 
CompletableFuture<>();
                     try {
-                        client.delete().inBackground().forPath(counterPath);
+                        client.delete()
+                                .inBackground(
+                                        (curatorFramework, curatorEvent) -> {
+                                            Preconditions.checkArgument(
+                                                    curatorEvent.getType()
+                                                            == 
CuratorEventType.DELETE,
+                                                    "An unexpected 
CuratorEvent was monitored: "
+                                                            + 
curatorEvent.getType());
+                                            Preconditions.checkArgument(
+                                                    
counterPath.equals(curatorEvent.getPath()),
+                                                    "An unexpected path was 
selected for deletion: "
+                                                            + 
curatorEvent.getPath());
+
+                                            final KeeperException.Code 
eventCode =
+                                                    KeeperException.Code.get(
+                                                            
curatorEvent.getResultCode());
+                                            if (Sets.immutableEnumSet(
+                                                            
KeeperException.Code.OK,
+                                                            
KeeperException.Code.NONODE)
+                                                    .contains(eventCode)) {
+                                                deletionFuture.complete(null);
+                                            } else {
+                                                
deletionFuture.completeExceptionally(
+                                                        KeeperException.create(
+                                                                
KeeperException.Code.get(
+                                                                        
curatorEvent
+                                                                               
 .getResultCode())));

Review comment:
       I updated the test code once more. Essentially, it will show the 
stacktrace of a generic error-case-specific `KeeperException`. For the 
`testShutdownWithFailureDueToExistingChildNodes` generates the following 
stacktrace:
   ```
   java.util.concurrent.CompletionException: 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NotEmptyException:
 KeeperErrorCode = Directory not empty for /checkpoint_id_counter
   
        at 
java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
        at 
java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
        at 
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounterITCase.testShutdownWithFailureDueToExistingChildNodes(ZooKeeperCheckpointIDCounterITCase.java:114)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
        at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
        at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
        at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
        at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
        at 
org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
        at 
org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
        at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
        at 
org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at java.util.ArrayList.forEach(ArrayList.java:1259)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
        at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
        at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
        at 
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
        at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
   Caused by: 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NotEmptyException:
 KeeperErrorCode = Directory not empty for /checkpoint_id_counter
        at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:132)
        at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
        at 
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter.handleDeletionOfCounterPath(ZooKeeperCheckpointIDCounter.java:167)
        at 
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter.lambda$shutdown$0(ZooKeeperCheckpointIDCounter.java:131)
        at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.sendToBackgroundCallback(CuratorFrameworkImpl.java:926)
        at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.CuratorFrameworkImpl.processBackgroundOperation(CuratorFrameworkImpl.java:683)
        at 
org.apache.flink.shaded.curator5.org.apache.curator.framework.imps.DeleteBuilderImpl$2.processResult(DeleteBuilderImpl.java:207)
        at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:675)
        at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:510)
   ```
   
   There's a log message printed on INFO beforehand in 
[ZooKeeperCheckpointIDCounter:L113](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java#L113)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to