This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit c7a0d28fc605f521c8597622a6eea442b34b3195 Author: Marcos Rico Peng <[email protected]> AuthorDate: Fri Jun 23 19:12:23 2023 -0400 Lattice MetaClient Distributed Semaphore Implementation (#2515) Co-authored-by: mapeng <[email protected]> --- .../recipes/lock/DistributedSemaphore.java | 123 ++++++++++++++++++--- .../helix/metaclient/recipes/lock/LockClient.java | 8 +- .../recipes/lock/DistributedSemaphoreTest.java | 105 ++++++++++++++++++ 3 files changed, 221 insertions(+), 15 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java index 0cccb130f..7b16e78fd 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java @@ -22,18 +22,45 @@ package org.apache.helix.metaclient.recipes.lock; import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.metaclient.api.MetaClientInterface; import org.apache.helix.metaclient.datamodel.DataRecord; +import org.apache.helix.metaclient.exception.MetaClientException; import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.TimeUnit; public class DistributedSemaphore { + private final MetaClientInterface<DataRecord> _metaClient; + private String _path; + private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY"; + private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY"; + private static final long DEFAULT_REMAINING_CAPACITY = -1; + private static final Logger LOG = LoggerFactory.getLogger(DistributedSemaphore.class); /** * Create a distributed semaphore client with the given configuration. * @param config configuration of the client */ public DistributedSemaphore(MetaClientConfig config) { - throw new NotImplementedException("Not implemented yet."); + if (config == null) { + throw new MetaClientException("Configuration cannot be null"); + } + LOG.info("Creating DistributedSemaphore Client"); + if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) { + ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder() + .setConnectionAddress(config.getConnectionAddress()) + .setZkSerializer(new DataRecordSerializer()) // Currently only support ZNRecordSerializer. + // Setting DataRecordSerializer as DataRecord extends ZNRecord. + .build(); + _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig); + _metaClient.connect(); + } else { + throw new MetaClientException("Unsupported store type: " + config.getStoreType()); + } } /** @@ -41,7 +68,17 @@ public class DistributedSemaphore { * @param client client to connect to */ public DistributedSemaphore(MetaClientInterface<DataRecord> client) { - throw new NotImplementedException("Not implemented yet."); + if (client == null) { + throw new MetaClientException("Client cannot be null"); + } + LOG.info("Connecting to existing DistributedSemaphore Client"); + _metaClient = client; + try { + _metaClient.connect(); + // TODO: Differentiate exception catch between already connected and already closed. + } catch (IllegalStateException e) { + // Ignore as it either has already been connected or already been closed. + } } /** @@ -50,7 +87,22 @@ public class DistributedSemaphore { * @param capacity capacity of the semaphore */ public void createSemaphore(String path, int capacity) { - throw new NotImplementedException("Not implemented yet."); + if (capacity <= 0) { + throw new MetaClientException("Capacity must be positive"); + } + if (path == null || path.isEmpty()) { + throw new MetaClientException("Invalid path to create semaphore"); + } + if (_metaClient.exists(path) != null) { + throw new MetaClientException("Semaphore already exists"); + } + if (_metaClient.exists(path) == null) { + DataRecord dataRecord = new DataRecord(path); + dataRecord.setLongField(INITIAL_CAPACITY_NAME, capacity); + dataRecord.setLongField(REMAINING_CAPACITY_NAME, capacity); + _metaClient.create(path, dataRecord); + _path = path; + } } /** @@ -58,7 +110,13 @@ public class DistributedSemaphore { * @param path path of the semaphore */ public void connectSemaphore(String path) { - throw new NotImplementedException("Not implemented yet."); + if (path == null || path.isEmpty()) { + throw new MetaClientException("Invalid path to connect semaphore"); + } + if (_metaClient.exists(path) == null) { + throw new MetaClientException("Semaphore does not exist"); + } + _path = path; } /** @@ -66,7 +124,13 @@ public class DistributedSemaphore { * @return a permit */ public Permit acquire() { - throw new NotImplementedException("Not implemented yet."); + try { + updateAcquirePermit(1); + return retrievePermit(_path); + } catch (MetaClientException e) { + LOG.error("Failed to acquire permit.", e); + return null; + } } @@ -76,7 +140,17 @@ public class DistributedSemaphore { * @return a collection of permits */ public Collection<Permit> acquire(int count) { - throw new NotImplementedException("Not implemented yet."); + try { + updateAcquirePermit(count); + Collection<Permit> permits = new ArrayList<>(); + for (int i = 0; i < count; i++) { + permits.add(retrievePermit(_path)); + } + return permits; + } catch (MetaClientException e) { + LOG.error("Failed to acquire permits.", e); + return null; + } } /** @@ -96,7 +170,7 @@ public class DistributedSemaphore { * @return remaining capacity */ public long getRemainingCapacity() { - throw new NotImplementedException("Not implemented yet."); + return getSemaphore().getLongField(REMAINING_CAPACITY_NAME, DEFAULT_REMAINING_CAPACITY); } /** @@ -104,14 +178,22 @@ public class DistributedSemaphore { * @return semaphore data record */ private DataRecord getSemaphore() { - throw new NotImplementedException("Not implemented yet."); + if (_metaClient.exists(_path) == null) { + throw new MetaClientException("Semaphore does not exist at path: " + _path + ". Please create it first."); + } + return new DataRecord(_metaClient.get(_path)); } /** * Return a permit. If the permit is already returned, log and return void. */ public void returnPermit(Permit permit) { - throw new NotImplementedException("Not implemented yet."); + if (permit.isReleased()) { + LOG.info("The permit has already been released"); + } else { + updateReturnPermit(); + permit.releasePermit(); + } } /** @@ -119,7 +201,9 @@ public class DistributedSemaphore { * log and return void. */ public void returnAllPermits(Collection<Permit> permits) { - throw new NotImplementedException("Not implemented yet."); + for (Permit permit : permits) { + returnPermit(permit); + } } /** @@ -128,7 +212,8 @@ public class DistributedSemaphore { * @return a permit */ private Permit retrievePermit(String path) { - throw new NotImplementedException("Not implemented yet."); + MetaClientInterface.Stat stat = _metaClient.exists(path); + return new Permit(getSemaphore(), stat); } /** @@ -136,13 +221,25 @@ public class DistributedSemaphore { * @param count number of permits to acquire */ private void updateAcquirePermit(int count) { - throw new NotImplementedException("Not implemented yet."); + _metaClient.update(_path, record -> { + long permitsAvailable = record.getLongField(REMAINING_CAPACITY_NAME, DEFAULT_REMAINING_CAPACITY); + if (permitsAvailable < count) { + throw new MetaClientException("No sufficient permits available. Attempt to acquire " + count + " permits, but only " + + permitsAvailable + " permits available"); + } + record.setLongField(REMAINING_CAPACITY_NAME, permitsAvailable - count); + return record; + }); } /** * Update the remaining capacity of the semaphore after returning a permit. */ private void updateReturnPermit() { - throw new NotImplementedException("Not implemented yet."); + _metaClient.update(_path, record -> { + long permitsAvailable = record.getLongField(REMAINING_CAPACITY_NAME, DEFAULT_REMAINING_CAPACITY); + record.setLongField(REMAINING_CAPACITY_NAME, permitsAvailable + 1); + return record; + }); } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java index cd5c3c297..8a9c489e3 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/LockClient.java @@ -64,8 +64,12 @@ public class LockClient implements LockClientInterface, AutoCloseable { throw new IllegalArgumentException("MetaClient cannot be null."); } _metaClient = client; - LOG.info("Connecting to existing MetaClient for LockClient"); - _metaClient.connect(); + try { + LOG.info("Connecting to existing MetaClient for LockClient"); + _metaClient.connect(); + } catch (IllegalStateException e) { + // Ignore as it either has already been connected or already been closed. + } } @Override diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java new file mode 100644 index 000000000..db874cd33 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphoreTest.java @@ -0,0 +1,105 @@ +package org.apache.helix.metaclient.recipes.lock; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Collection; + +public class DistributedSemaphoreTest extends ZkMetaClientTestBase { + + public DistributedSemaphore createSemaphoreClientAndSemaphore(String path, int capacity) { + + MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER; + MetaClientConfig config = new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR) + .setStoreType(storeType).build(); + DistributedSemaphore client = new DistributedSemaphore(config); + client.createSemaphore(path, capacity); + return client; + } + + @Test + public void testAcquirePermit() { + final String key = "/TestSemaphore_testAcquirePermit"; + int capacity = 5; + DistributedSemaphore semaphoreClient = createSemaphoreClientAndSemaphore(key, capacity); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + + Permit permit = semaphoreClient.acquire(); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 1); + } + + @Test + public void testAcquireMultiplePermits() { + final String key = "/TestSemaphore_testAcquireMultiplePermits"; + int capacity = 5; + int count = 4; + DistributedSemaphore semaphoreClient = createSemaphoreClientAndSemaphore(key, capacity); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + + Collection<Permit> permits = semaphoreClient.acquire(count); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 4); + Assert.assertEquals(permits.size(), count); + Assert.assertNull(semaphoreClient.acquire(count)); + } + + @Test + public void testReturnPermit() { + final String key = "/TestSemaphore_testReturnPermit"; + int capacity = 5; + DistributedSemaphore semaphoreClient = createSemaphoreClientAndSemaphore(key, capacity); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + + Permit permit = semaphoreClient.acquire(); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 1); + + semaphoreClient.returnPermit(permit); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + + // return the same permit again. Should not fail but capacity remains same. + semaphoreClient.returnPermit(permit); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + } + + @Test + public void testReturnMultiplePermits() { + final String key = "/TestSemaphore_testReturnMultiplePermits"; + int capacity = 5; + int count = 4; + DistributedSemaphore semaphoreClient = createSemaphoreClientAndSemaphore(key, capacity); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + + Collection<Permit> permits = semaphoreClient.acquire(count); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity - 4); + Assert.assertEquals(permits.size(), count); + + semaphoreClient.returnAllPermits(permits); + Assert.assertEquals(semaphoreClient.getRemainingCapacity(), capacity); + } + + @Test + public void testTryAcquirePermit() { + // Not implemented + } + +}
