mxm commented on code in PR #13795:
URL: https://github.com/apache/iceberg/pull/13795#discussion_r2281581690


##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java:
##########
@@ -46,6 +44,7 @@ public class ZkLockFactory implements TriggerLockFactory {
   private transient CuratorFramework client;
   private transient SharedCount taskSharedCount;
   private transient SharedCount recoverySharedCount;
+  private volatile boolean isOpen = false;

Review Comment:
   ```suggestion
     private volatile boolean isOpen;
   ```
   
   No need to initialize explictly.



##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java:
##########
@@ -126,29 +159,46 @@ public void close() throws IOException {
       if (client != null) {
         client.close();
       }
+      isOpen = false;
     }
   }
 
   /** Zookeeper lock implementation */
   private static class ZkLock implements Lock {
     private final SharedCount sharedCount;
+    private final String lockId;
+    private final String lockType;
+    private final String lockPath;
+
+    private static final int LOCKED = 1;
+    private static final int UNLOCKED = 0;
 
-    private ZkLock(SharedCount sharedCount) {
+    private ZkLock(String lockId, String lockType, String lockPath, 
SharedCount sharedCount) {
+      this.lockId = lockId;
+      this.lockType = lockType;
+      this.lockPath = lockPath;
       this.sharedCount = sharedCount;
     }
 
     @Override
     public boolean tryLock() {
       VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
       if (isHeld(versionedValue)) {
-        LOG.debug("Lock is already held for {}", this);
+        LOG.debug(
+            "Lock is already held for lockId: {}, type: {}, path: {}.", 
lockId, lockType, lockPath);
         return false;
       }
 
       try {
-        return sharedCount.trySetCount(versionedValue, LOCKED);
+        boolean acquired = sharedCount.trySetCount(versionedValue, LOCKED);
+        if (!acquired) {
+          LOG.warn(

Review Comment:
   +1 for DEBUG level logging here.



##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ZkLockFactory.java:
##########
@@ -66,6 +65,16 @@ public ZkLockFactory(
       int maxRetries) {
     Preconditions.checkNotNull(connectString, "Zookeeper connection string 
cannot be null");
     Preconditions.checkNotNull(lockId, "Lock ID cannot be null");
+    Preconditions.checkArgument(
+        sessionTimeoutMs > 0, "Session timeout must be positive, got: %s", 
sessionTimeoutMs);
+    Preconditions.checkArgument(
+        connectionTimeoutMs > 0,
+        "Connection timeout must be positive, got: %s",
+        connectionTimeoutMs);
+    Preconditions.checkArgument(
+        baseSleepTimeMs > 0, "Base sleep time must be positive, got: %s", 
baseSleepTimeMs);

Review Comment:
   I think `>= 0` might be correct for all of these. Zero is also allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to