sborya commented on a change in pull request #938: SAMZA-1531: Support run.id 
in standalone for batch processing.
URL: https://github.com/apache/samza/pull/938#discussion_r264475902
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/zk/ZkDistributedReadWriteLock.java
 ##########
 @@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.DistributedReadWriteLock;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Zookeeper.
+ */
+public class ZkDistributedReadWriteLock implements DistributedReadWriteLock {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(ZkDistributedReadWriteLock.class);
+  private static final String PARTICIPANTS_PATH = "participants";
+  private static final String PROCESSORS_PATH = "processors";
+  private final ZkUtils zkUtils;
+  private final String lockPath;
+  private final String particpantsPath;
+  private final String processorsPath;
+  private final String participantId;
+  private final ZkKeyBuilder keyBuilder;
+  private final Random random = new Random();
+  private String activeParticipantPath = null;
+  private String activeProcessorPath = null;
+  private Object mutex;
+  private Boolean isInCriticalSection = false;
+  private Boolean isStateLost = false;
+
+  public ZkDistributedReadWriteLock(String participantId, ZkUtils zkUtils, 
String lockId) {
+    if (zkUtils == null) {
+      throw new RuntimeException("Cannot operate ZkDistributedReadWriteLock 
without ZkUtils.");
+    }
+    this.zkUtils = zkUtils;
+    this.participantId = participantId;
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    lockPath = String.format("%s/readWriteLock-%s", 
keyBuilder.getRootPath(),lockId);
+    particpantsPath = String.format("%s/%s", lockPath, PARTICIPANTS_PATH);
+    processorsPath = String.format("%s/%s", lockPath, PROCESSORS_PATH);
+    zkUtils.validatePaths(new String[] {lockPath, particpantsPath, 
processorsPath});
+    mutex = new Object();
+    zkUtils.getZkClient().subscribeChildChanges(particpantsPath, new 
ParticipantChangeHandler(zkUtils));
+    zkUtils.getZkClient().subscribeStateChanges(new 
ZkSessionStateChangedListener());
+  }
+
+  /**
+   * Tries to acquire a lock in order to generate run.id. On failure to 
acquire lock, it keeps trying until the lock times out.
+   * Creates a sequential ephemeral node under "participants" to acquire the 
lock.
+   * If the path of this node has the lowest sequence number,
+   * it creates a sequential ephemeral node under "processors" and checks the 
number of nodes under "processors"
+   * if there is only one node under "processors", a WRITE access lock is 
acquired
+   * else a READ access lock is acquired
+   * @param timeout Duration of lock acquiring timeout.
+   * @param unit Unit of the timeout defined above.
+   * @return AccessType.READ/WRITE if lock is acquired successfully, 
AccessType.NONE if it times out.
+   */
+  @Override
+  public AccessType lock(long timeout, TimeUnit unit)
+      throws TimeoutException {
+
+    activeParticipantPath = 
zkUtils.getZkClient().createEphemeralSequential(particpantsPath + "/", 
participantId);
+
+    //Start timer for timeout
+    long startTime = System.currentTimeMillis();
+    long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+
+    while((System.currentTimeMillis() - startTime) < lockTimeout) {
+      synchronized (mutex) {
+        AccessType accessType = checkAndAcquireLock();
+        if(accessType != AccessType.NONE) {
+          isInCriticalSection = true;
+          if(isStateLost) {
+            throw new SamzaException("Lock's state lost due to connection 
expiry");
+          }
+          return accessType;
+        } else {
 
 Review comment:
   when this happens?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to