vjagadish1989 commented on a change in pull request #938: SAMZA-1531: Support
run.id in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r286735441
##########
File path: samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
##########
@@ -18,93 +18,84 @@
*/
package org.apache.samza.zk;
+import java.time.Duration;
import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Distributed lock primitive for Zookeeper.
*/
-public class ZkDistributedLock implements DistributedLockWithState {
+public class ZkDistributedLock implements DistributedLock {
public static final Logger LOG =
LoggerFactory.getLogger(ZkDistributedLock.class);
private static final String STATE_INITED = "sate_initialized";
private final ZkUtils zkUtils;
private final String lockPath;
private final String participantId;
private final ZkKeyBuilder keyBuilder;
- private final Random random = new Random();
private String nodePath = null;
- private final String statePath;
+ private Object mutex;
public ZkDistributedLock(String participantId, ZkUtils zkUtils, String
lockId) {
this.zkUtils = zkUtils;
this.participantId = participantId;
this.keyBuilder = zkUtils.getKeyBuilder();
- lockPath = String.format("%s/stateLock_%s", keyBuilder.getRootPath(),
lockId);
- statePath = String.format("%s/%s_%s", lockPath, STATE_INITED, lockId);
+ lockPath = String.format("%s/lock_%s", keyBuilder.getRootPath(), lockId);
zkUtils.validatePaths(new String[] {lockPath});
+ mutex = new Object();
+ zkUtils.getZkClient().subscribeChildChanges(lockPath, new
ParticipantChangeHandler(zkUtils));
}
/**
* Tries to acquire a lock in order to create intermediate streams. On
failure to acquire lock, it keeps trying until the lock times out.
* Creates a sequential ephemeral node to acquire the lock. If the path of
this node has the lowest sequence number, the processor has acquired the lock.
* @param timeout Duration of lock acquiring timeout.
- * @param unit Unit of the timeout defined above.
- * @return true if lock is acquired successfully, false if it times out.
+ * @return true if lock is acquired successfully else returns false if
failed to acquire within timeout
*/
@Override
- public boolean lockIfNotSet(long timeout, TimeUnit unit)
- throws TimeoutException {
+ public boolean lock(Duration timeout) {
nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/",
participantId);
//Start timer for timeout
long startTime = System.currentTimeMillis();
- long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ long lockTimeout = timeout.toMillis();
while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+ synchronized (mutex) {
Review comment:
Why change the existing implementation that relies on thread.sleep? - If
it's a simple enough operation, sleep + re-poll may be more understandable than
a mutex + callback combo
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services