rickyspeak commented on a change in pull request #15393:
URL: https://github.com/apache/flink/pull/15393#discussion_r605949253



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.java
##########
@@ -127,7 +129,14 @@ private String createZkPath(JobID jobID) {
     private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) 
throws Exception {
         LOG.debug("Setting scheduling state for job {} to {}.", jobID, status);
         final String zkPath = createZkPath(jobID);
-        
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
-        this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        final Stat stat = this.client.checkExists().forPath(zkPath);
+        if (stat != null) {
+            this.client.setData().forPath(zkPath, 
status.name().getBytes(ENCODING));
+        } else {
+            this.client
+                    .create()
+                    .creatingParentContainersIfNeeded()
+                    .forPath(zkPath, status.name().getBytes(ENCODING));
+        }

Review comment:
       The current test is already testing with multiple paths.  The 
`setJobRunning` happens when there is no node and `setJobFinished` happens when 
the is a node.  `clearJob` is called on an existing node.  I added comments to 
this effect.  I added another check for `clearJob` when there is no node.  Also 
I added the loop as requested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to