This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new fc4a2e120 Gateway util for per key lock and per key blocking queue
executor(#2847)
fc4a2e120 is described below
commit fc4a2e1206ba952b39ec91b88310dd82a6b6c281
Author: xyuanlu <[email protected]>
AuthorDate: Wed Jul 24 12:23:02 2024 -0700
Gateway util for per key lock and per key blocking queue executor(#2847)
Gateway util for per key lock and per key blocking queue executor
---
.../helix/gateway/util/PerKeyBlockingExecutor.java | 80 ++++++++++++++++++++++
.../helix/gateway/util/PerKeyLockRegistry.java | 51 ++++++++++++++
.../helix/gateway/TestPerKeyBlockingExecutor.java | 44 ++++++++++++
.../helix/gateway/TestPerKeyLockRegistry.java | 55 +++++++++++++++
4 files changed, 230 insertions(+)
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
new file mode 100644
index 000000000..7953996e9
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyBlockingExecutor.java
@@ -0,0 +1,80 @@
+package org.apache.helix.gateway.util;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * A per-key blocking executor that ensures that only one event is running for
a given key at a time.
+ */
+public class PerKeyBlockingExecutor {
+ private final ThreadPoolExecutor _executor;
+ private final Map<String, Queue<Runnable>> _pendingBlockedEvents;
+ private final ConcurrentHashMap.KeySetView<String, Boolean> _runningEvents;
+ private final Lock _queueLock;
+
+ public PerKeyBlockingExecutor(int maxWorkers) {
+ this._executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(maxWorkers);
+ this._pendingBlockedEvents = new HashMap<>();
+ this._queueLock = new ReentrantLock();
+ this._runningEvents = ConcurrentHashMap.newKeySet();
+ }
+
+ /**
+ * Offer an event to be executed. If an event is already running for the
given key, the event will be queued.
+ * @param key
+ * @param event
+ */
+ public void offerEvent(String key, Runnable event) {
+ _queueLock.lock();
+ try {
+ if (!_runningEvents.contains(key)) {
+ _executor.execute(() -> runEvent(key, event));
+ } else {
+ _pendingBlockedEvents.computeIfAbsent(key, k -> new
ConcurrentLinkedQueue<>());
+ _pendingBlockedEvents.get(key).offer(event);
+ }
+ } finally {
+ _queueLock.unlock();
+ }
+ }
+
+ private void runEvent(String key, Runnable event) {
+ try {
+ _runningEvents.add(key);
+ event.run();
+ } finally {
+ _queueLock.lock();
+ try {
+ _runningEvents.remove(key);
+ processQueue(key);
+ } finally {
+ _queueLock.unlock();
+ }
+ }
+ }
+
+ private void processQueue(String key) {
+ if (!_pendingBlockedEvents.containsKey(key)) {
+ return;
+ }
+ Runnable event = _pendingBlockedEvents.get(key).poll();
+ if (event != null) {
+ _executor.execute(() -> runEvent(key, event));
+ }
+ }
+
+ public void shutdown() {
+ _executor.shutdown();
+ }
+
+}
diff --git
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
new file mode 100644
index 000000000..5bfc3c949
--- /dev/null
+++
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PerKeyLockRegistry.java
@@ -0,0 +1,51 @@
+package org.apache.helix.gateway.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A registry that manages locks per key.
+ */
+public class PerKeyLockRegistry {
+ private final ConcurrentHashMap<String, ReentrantLock> lockMap = new
ConcurrentHashMap<>();
+
+ public void lock(String key) {
+ ReentrantLock lock = lockMap.computeIfAbsent(key, k -> new
ReentrantLock());
+ lock.lock();
+ }
+
+ public void unlock(String key) {
+ ReentrantLock lock = lockMap.get(key);
+ if (lock != null) {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Execute the action with the lock on the key
+ * @param key
+ * @param action
+ */
+ public void withLock(String key, Runnable action) {
+ lock(key);
+ try {
+ action.run();
+ } finally {
+ unlock(key);
+ }
+ }
+
+ /**
+ * Remove the lock if it is not being used.
+ * it must be called after the lock is required
+ * @param key
+ */
+ public boolean removeLock(String key) {
+ ReentrantLock lock = lockMap.get(key);
+ if (lock != null && lock.isHeldByCurrentThread() &&
!lock.hasQueuedThreads()) {
+ lockMap.remove(key, lock);
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
new file mode 100644
index 000000000..457002cbd
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyBlockingExecutor.java
@@ -0,0 +1,44 @@
+package org.apache.helix.gateway;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+public class TestPerKeyBlockingExecutor {
+ @Test
+ public void testEventNotAddedIfPending() throws InterruptedException {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ CountDownLatch latch2 = new CountDownLatch(1);
+ CountDownLatch latch3 = new CountDownLatch(1);
+
+ PerKeyBlockingExecutor perKeyBlockingExecutor = new
PerKeyBlockingExecutor(3);
+
+ perKeyBlockingExecutor.offerEvent("key1", () -> {
+ try {
+ latch1.await(); // Wait for the test to release this latch
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ perKeyBlockingExecutor.offerEvent("key1", () -> {
+ latch2.countDown();
+ });
+
+ Thread.sleep(100); // Give time for the second event to be potentially
processed
+
+ Assert.assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); // Event 2
should not run yet
+ latch1.countDown(); // Release the first latch
+ Assert.assertTrue(latch2.await(1, TimeUnit.SECONDS)); // Event 2 should
run now
+
+ perKeyBlockingExecutor.offerEvent("key1", () -> {
+ latch3.countDown();
+ });
+
+ Assert.assertTrue(latch3.await(1, TimeUnit.SECONDS)); // Event 3 should
run after Event 2
+ perKeyBlockingExecutor.shutdown();
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
new file mode 100644
index 000000000..89d2e0bc1
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/TestPerKeyLockRegistry.java
@@ -0,0 +1,55 @@
+package org.apache.helix.gateway;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.helix.gateway.util.PerKeyLockRegistry;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+public class TestPerKeyLockRegistry {
+ @Test
+ public void testConcurrentAccess() {
+ PerKeyLockRegistry lockRegistry = new PerKeyLockRegistry();
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(2);
+
+ lockRegistry.withLock("key1", () -> {
+ counter.incrementAndGet();
+ // try to acquir the lock for another key
+ lockRegistry.withLock("key2", () -> {
+ counter.incrementAndGet();
+ });
+ });
+
+ // counter should be 2
+ Assert.assertEquals(2, counter.get());
+
+ // acquire the lock for key
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ lockRegistry.lock("key1");
+ executor.submit(() -> {
+ lockRegistry.withLock("key1", () -> {
+ //try remove lock
+ Assert.assertFalse(lockRegistry.removeLock("key1"));
+ });
+ });
+ lockRegistry.unlock("key1");
+ executor.submit(() -> {
+ lockRegistry.withLock("key2", () -> {
+ //try remove lock, should fail because key1 is not locked
+ Assert.assertFalse(lockRegistry.removeLock("key1"));
+ });
+ });
+ executor.submit(() -> {
+ lockRegistry.withLock("key1", () -> {
+ //try remove lock, only this tiem it succeded
+ Assert.assertFalse(lockRegistry.removeLock("key1"));
+ });
+ });
+ executor.shutdown();
+ }
+}