This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit 34f826094a045fa0c323ef83da99433ac30fd495 Author: Hunter Lee <hu...@linkedin.com> AuthorDate: Fri Sep 6 16:34:53 2019 -0700 Add BucketDataAccessor for large writes For the new WAGED rebalancer, it's necessary to have a data accessor that will allow writes of data exceeding 1MB. ZooKeeper's ZNode size is capped at 1MB, so BucketDataAccessor interface and ZkBucketDataAccessor help us achieve this. Changelist: 1. Add BucketDataAccessor and ZkBucketDataAccessor 2. Add necessary serializers 3. Add an integration test against ZK --- .../java/org/apache/helix/BucketDataAccessor.java | 53 ++++ .../manager/zk/ZNRecordJacksonSerializer.java | 67 +++++ .../helix/manager/zk/ZkBucketDataAccessor.java | 326 +++++++++++++++++++++ .../helix/manager/zk/TestZkBucketDataAccessor.java | 233 +++++++++++++++ 4 files changed, 679 insertions(+) diff --git a/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java new file mode 100644 index 0000000..2008c23 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/BucketDataAccessor.java @@ -0,0 +1,53 @@ +package org.apache.helix; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +public interface BucketDataAccessor { + + /** + * Write a HelixProperty in buckets, compressed. + * @param path path to which the metadata will be written to + * @param value HelixProperty to write + * @param <T> + * @throws IOException + */ + <T extends HelixProperty> boolean compressedBucketWrite(String path, T value) throws IOException; + + /** + * Read a HelixProperty that was written in buckets, compressed. + * @param path + * @param helixPropertySubType the subtype of HelixProperty the data was written in + * @param <T> + */ + <T extends HelixProperty> HelixProperty compressedBucketRead(String path, + Class<T> helixPropertySubType); + + /** + * Delete the HelixProperty in the given path. + * @param path + */ + void compressedBucketDelete(String path); + + /** + * Close the connection to the metadata store. + */ + void disconnect(); +} diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java new file mode 100644 index 0000000..989017a --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZNRecordJacksonSerializer.java @@ -0,0 +1,67 @@ +package org.apache.helix.manager.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * ZNRecordJacksonSerializer serializes ZNRecord objects into a byte array using MessagePack's + * serializer. Note that this serializer doesn't check for the size of the resulting binary. + */ +public class ZNRecordJacksonSerializer implements ZkSerializer { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(Object record) throws ZkMarshallingError { + if (!(record instanceof ZNRecord)) { + // null is NOT an instance of any class + throw new HelixException("Input object is not of type ZNRecord (was " + record + ")"); + } + ZNRecord znRecord = (ZNRecord) record; + + try { + return OBJECT_MAPPER.writeValueAsBytes(znRecord); + } catch (IOException e) { + throw new HelixException( + String.format("Exception during serialization. ZNRecord id: %s", znRecord.getId()), e); + } + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + if (bytes == null || bytes.length == 0) { + // reading a parent/null node + return null; + } + + ZNRecord record; + try { + record = OBJECT_MAPPER.readValue(bytes, ZNRecord.class); + } catch (IOException e) { + throw new HelixException("Exception during deserialization!", e); + } + return record; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java new file mode 100644 index 0000000..24c7c8e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -0,0 +1,326 @@ +package org.apache.helix.manager.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.BucketDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.client.DedicatedZkClientFactory; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; +import org.apache.helix.util.GZipCompressionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable { + private static Logger LOG = LoggerFactory.getLogger(ZkBucketDataAccessor.class); + + private static final int DEFAULT_NUM_VERSIONS = 2; + private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE"; + private static final String DATA_SIZE_KEY = "DATA_SIZE"; + private static final String WRITE_LOCK_KEY = "WRITE_LOCK"; + private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS"; + + // 100 KB for default bucket size + private static final int DEFAULT_BUCKET_SIZE = 50 * 1024; + private final int _bucketSize; + private final int _numVersions; + private ZkSerializer _zkSerializer; + private HelixZkClient _zkClient; + private HelixZkClient _znRecordClient; + private BaseDataAccessor _zkBaseDataAccessor; + private BaseDataAccessor<ZNRecord> _znRecordBaseDataAccessor; + + /** + * Constructor that allows a custom bucket size. + * @param zkAddr + * @param bucketSize + * @param numVersions number of versions to store in ZK + */ + public ZkBucketDataAccessor(String zkAddr, int bucketSize, int numVersions) { + // There are two HelixZkClients: + // 1. _zkBaseDataAccessor for writes of binary data + // 2. _znRecordBaseDataAccessor for writes of ZNRecord (metadata) + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + _zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object data) throws ZkMarshallingError { + if (data instanceof byte[]) { + return (byte[]) data; + } + throw new HelixException("ZkBucketDataAccesor only supports a byte array as an argument!"); + } + + @Override + public Object deserialize(byte[] data) throws ZkMarshallingError { + return data; + } + }); + _zkBaseDataAccessor = new ZkBaseDataAccessor(_zkClient); + + // TODO: Optimize serialization with Jackson + // TODO: Or use a better binary serialization protocol + // TODO: Consider making this also binary + // TODO: Consider an async write for the metadata as well + _znRecordClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + _znRecordBaseDataAccessor = new ZkBaseDataAccessor<>(_znRecordClient); + _znRecordClient.setZkSerializer(new ZNRecordSerializer()); + + _zkSerializer = new ZNRecordJacksonSerializer(); + _bucketSize = bucketSize; + _numVersions = numVersions; + } + + /** + * Constructor that uses a default bucket size. + * @param zkAddr + */ + public ZkBucketDataAccessor(String zkAddr) { + this(zkAddr, DEFAULT_BUCKET_SIZE, DEFAULT_NUM_VERSIONS); + } + + @Override + public <T extends HelixProperty> boolean compressedBucketWrite(String path, T value) + throws IOException { + // Take the ZNrecord and serialize it (get byte[]) + byte[] serializedRecord = _zkSerializer.serialize(value.getRecord()); + // Compress the byte[] + byte[] compressedRecord = GZipCompressionUtil.compress(serializedRecord); + // Compute N - number of buckets + int numBuckets = (compressedRecord.length + _bucketSize - 1) / _bucketSize; + + if (tryLock(path)) { + try { + // Read or initialize metadata and compute the last success version index + ZNRecord metadataRecord = + _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); + if (metadataRecord == null) { + metadataRecord = new ZNRecord(extractIdFromPath(path)); + } + int lastSuccessIndex = + (metadataRecord.getIntField(LAST_SUCCESS_KEY, -1) + 1) % _numVersions; + String dataPath = path + "/" + lastSuccessIndex; + + List<String> paths = new ArrayList<>(); + List<Object> buckets = new ArrayList<>(); + + int ptr = 0; + int counter = 0; + while (counter < numBuckets) { + paths.add(dataPath + "/" + counter); + if (counter == numBuckets - 1) { + // Special treatment for the last bucket + buckets.add(Arrays.copyOfRange(compressedRecord, ptr, + ptr + compressedRecord.length % _bucketSize)); + } else { + buckets.add(Arrays.copyOfRange(compressedRecord, ptr, ptr + _bucketSize)); + } + ptr += _bucketSize; + counter++; + } + + // Do a cleanup of previous data + if (!_zkBaseDataAccessor.remove(dataPath, AccessOption.PERSISTENT)) { + // Clean-up is not critical so upon failure, we log instead of throwing an exception + LOG.warn("Failed to clean up previous bucketed data in data path: {}", dataPath); + } + + // Do an async set to ZK + boolean[] success = + _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT); + // Exception and fail the write if any failed + for (boolean s : success) { + if (!s) { + throw new HelixException( + String.format("Failed to write the data buckets for path: %s", path)); + } + } + + // Data write completed, so update the metadata with last success index + // Note that the metadata ZNodes is written using sync write + metadataRecord.setIntField(BUCKET_SIZE_KEY, _bucketSize); + metadataRecord.setLongField(DATA_SIZE_KEY, compressedRecord.length); + metadataRecord.setIntField(LAST_SUCCESS_KEY, lastSuccessIndex); + if (!_znRecordBaseDataAccessor.set(path, metadataRecord, AccessOption.PERSISTENT)) { + throw new HelixException( + String.format("Failed to write the metadata at path: %s!", path)); + } + } finally { + // Critical section for write ends here + unlock(path); + } + return true; + } + throw new HelixException(String.format("Could not acquire lock for write. Path: %s", path)); + } + + @Override + public <T extends HelixProperty> HelixProperty compressedBucketRead(String path, + Class<T> helixPropertySubType) { + return helixPropertySubType.cast(compressedBucketRead(path)); + } + + @Override + public void compressedBucketDelete(String path) { + if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) { + throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path)); + } + } + + @Override + public void disconnect() { + if (!_zkClient.isClosed()) { + _zkClient.close(); + } + if (!_znRecordClient.isClosed()) { + _znRecordClient.close(); + } + } + + private HelixProperty compressedBucketRead(String path) { + // TODO: Incorporate parallelism into reads instead of locking the whole thing against other + // reads and writes + if (tryLock(path)) { + try { + // Retrieve the metadata + ZNRecord metadataRecord = + _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT); + if (metadataRecord == null) { + throw new HelixException( + String.format("Metadata ZNRecord does not exist for path: %s", path)); + } + + int bucketSize = metadataRecord.getIntField(BUCKET_SIZE_KEY, -1); + int dataSize = metadataRecord.getIntField(DATA_SIZE_KEY, -1); + int lastSuccessIndex = metadataRecord.getIntField(LAST_SUCCESS_KEY, -1); + if (lastSuccessIndex == -1) { + throw new HelixException(String.format("Metadata ZNRecord does not have %s! Path: %s", + LAST_SUCCESS_KEY, path)); + } + if (bucketSize == -1) { + throw new HelixException( + String.format("Metadata ZNRecord does not have %s! Path: %s", BUCKET_SIZE_KEY, path)); + } + if (dataSize == -1) { + throw new HelixException( + String.format("Metadata ZNRecord does not have %s! Path: %s", DATA_SIZE_KEY, path)); + } + + // Compute N - number of buckets + int numBuckets = (dataSize + _bucketSize - 1) / _bucketSize; + byte[] compressedRecord = new byte[dataSize]; + String dataPath = path + "/" + lastSuccessIndex; + + List<String> paths = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + paths.add(dataPath + "/" + i); + } + + // Async get + List buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true); + + // Combine buckets into one byte array + int copyPtr = 0; + for (int i = 0; i < numBuckets; i++) { + if (i == numBuckets - 1) { + // Special treatment for the last bucket + System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, dataSize % bucketSize); + } else { + System.arraycopy(buckets.get(i), 0, compressedRecord, copyPtr, bucketSize); + copyPtr += bucketSize; + } + } + + // Decompress the byte array + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(compressedRecord); + byte[] serializedRecord; + try { + serializedRecord = GZipCompressionUtil.uncompress(byteArrayInputStream); + } catch (IOException e) { + throw new HelixException(String.format("Failed to decompress path: %s!", path), e); + } + + // Deserialize the record to retrieve the original + ZNRecord originalRecord = (ZNRecord) _zkSerializer.deserialize(serializedRecord); + return new HelixProperty(originalRecord); + } finally { + // Critical section for read ends here + unlock(path); + } + } + throw new HelixException(String.format("Could not acquire lock for read. Path: %s", path)); + } + + /** + * Returns the last string element in a split String array by /. + * @param path + * @return + */ + private String extractIdFromPath(String path) { + String[] splitPath = path.split("/"); + return splitPath[splitPath.length - 1]; + } + + /** + * Acquires the lock (create an ephemeral node) only if it is free (no ephemeral node already + * exists) at the time of invocation. + * @param path + * @return + */ + private boolean tryLock(String path) { + // Check if another write is taking place and if not, create an ephemeral node to simulate + // acquiring of a lock + return !_zkBaseDataAccessor.exists(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL) + && _zkBaseDataAccessor.set(path + "/" + WRITE_LOCK_KEY, new byte[0], + AccessOption.EPHEMERAL); + } + + /** + * Releases the lock (removes the ephemeral node). + * @param path + */ + private void unlock(String path) { + // Write succeeded, so release the lock + if (!_zkBaseDataAccessor.remove(path + "/" + WRITE_LOCK_KEY, AccessOption.EPHEMERAL)) { + throw new HelixException(String.format("Could not remove ephemeral node for path: %s", path)); + } + // TODO: In case of remove failure, we risk a lock never getting released. + // TODO: Consider two possible improvements + // TODO: 1. Use ephemeral owner id for the same connection to reclaim the lock + // TODO: 2. Use "lease" - lock with a timeout + } + + @Override + public void close() throws Exception { + disconnect(); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java new file mode 100644 index 0000000..4c28835 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java @@ -0,0 +1,233 @@ +package org.apache.helix.manager.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.helix.AccessOption; +import org.apache.helix.BucketDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.common.ZkTestBase; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestZkBucketDataAccessor extends ZkTestBase { + + private static final String PATH = "/" + TestHelper.getTestClassName(); + private static final String NAME_KEY = TestHelper.getTestClassName(); + private static final String LAST_SUCCESS_KEY = "LAST_SUCCESS"; + private static final String BUCKET_SIZE_KEY = "BUCKET_SIZE"; + private static final String WRITE_LOCK_KEY = "WRITE_LOCK"; + + // Populate list and map fields for content comparison + private static final List<String> LIST_FIELD = ImmutableList.of("1", "2"); + private static final Map<String, String> MAP_FIELD = ImmutableMap.of("1", "2"); + + private BucketDataAccessor _bucketDataAccessor; + + @BeforeClass + public void beforeClass() { + _bucketDataAccessor = new ZkBucketDataAccessor(ZK_ADDR); + } + + @AfterClass + public void afterClass() { + _bucketDataAccessor.disconnect(); + } + + /** + * Attempt writing a simple HelixProperty using compressedBucketWrite. + * @throws IOException + */ + @Test + public void testCompressedBucketWrite() throws IOException { + ZNRecord record = new ZNRecord(NAME_KEY); + record.setSimpleField(NAME_KEY, NAME_KEY); + record.setListField(NAME_KEY, LIST_FIELD); + record.setMapField(NAME_KEY, MAP_FIELD); + Assert.assertTrue(_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record))); + } + + /** + * The record written in {@link #testCompressedBucketWrite()} is the same record that was written. + */ + @Test(dependsOnMethods = "testCompressedBucketWrite") + public void testCompressedBucketRead() { + HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class); + Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY); + Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD); + Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD); + _bucketDataAccessor.compressedBucketDelete(PATH); + } + + /** + * Do 10 writes and check that there are 5 versions of the data. + */ + @Test(dependsOnMethods = "testCompressedBucketRead") + public void testManyWritesWithVersionCounts() throws IOException { + int bucketSize = 50 * 1024; + int numVersions = 5; + int expectedLastSuccessfulIndex = 4; + String path = PATH + "2"; + ZNRecord record = new ZNRecord(NAME_KEY); + record.setSimpleField(NAME_KEY, NAME_KEY); + record.setListField(NAME_KEY, LIST_FIELD); + record.setMapField(NAME_KEY, MAP_FIELD); + + BucketDataAccessor bucketDataAccessor = + new ZkBucketDataAccessor(ZK_ADDR, bucketSize, numVersions); + for (int i = 0; i < 10; i++) { + bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record)); + } + + // Check that there are numVersions number of children under path + List<String> children = _baseAccessor.getChildNames(path, AccessOption.PERSISTENT); + Assert.assertEquals(children.size(), numVersions); + + // Check that last successful index is 4 (since we did 10 writes) + ZNRecord metadata = _baseAccessor.get(path, null, AccessOption.PERSISTENT); + Assert.assertEquals(metadata.getIntField(LAST_SUCCESS_KEY, -1), expectedLastSuccessfulIndex); + + // Clean up + bucketDataAccessor.compressedBucketDelete(path); + bucketDataAccessor.disconnect(); + } + + /** + * Write a HelixProperty with large number of entries using BucketDataAccessor and read it back. + */ + @Test(dependsOnMethods = "testManyWritesWithVersionCounts") + public void testLargeWriteAndRead() throws IOException { + String name = "largeResourceAssignment"; + HelixProperty property = createLargeHelixProperty(name, 100000); + + // Perform large write + long before = System.currentTimeMillis(); + _bucketDataAccessor.compressedBucketWrite("/" + name, property); + long after = System.currentTimeMillis(); + System.out.println("Write took " + (after - before) + " ms"); + + // Read it back + before = System.currentTimeMillis(); + HelixProperty readRecord = + _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class); + after = System.currentTimeMillis(); + System.out.println("Read took " + (after - before) + " ms"); + + // Check against the original HelixProperty + Assert.assertEquals(readRecord, property); + } + + /** + * Tests that each write cleans up previous bucketed data. This method writes some small amount of + * data and checks that the data buckets from the large write performed in the previous test + * method have been cleaned up. + * @throws IOException + */ + @Test(dependsOnMethods = "testLargeWriteAndRead") + public void testCleanupBeforeWrite() throws IOException { + // Create a HelixProperty of a very small size with the same name as the large HelixProperty + // created from the previous method + String name = "largeResourceAssignment"; + HelixProperty property = new HelixProperty(name); + property.getRecord().setIntField("Hi", 10); + + // Get the bucket count from the write performed in the previous method + ZNRecord metadata = _baseAccessor.get("/" + name, null, AccessOption.PERSISTENT); + int origBucketSize = metadata.getIntField(BUCKET_SIZE_KEY, -1); + + // Perform a write twice to overwrite both versions + _bucketDataAccessor.compressedBucketWrite("/" + name, property); + _bucketDataAccessor.compressedBucketWrite("/" + name, property); + + // Check that the children count for version 0 (version for the large write) is 1 + Assert.assertEquals( + _baseAccessor.getChildNames("/" + name + "/0", AccessOption.PERSISTENT).size(), 1); + + // Clean up + _bucketDataAccessor.compressedBucketDelete("/" + name); + } + + /** + * Test that no concurrent reads and writes are allowed by triggering multiple operations after + * creating an artificial lock. + * @throws IOException + */ + @Test(dependsOnMethods = "testCleanupBeforeWrite") + public void testFailureToAcquireLock() throws Exception { + String name = "acquireLock"; + // Use a large HelixProperty to simulate a write that keeps the lock for some time + HelixProperty property = createLargeHelixProperty(name, 100); + + // Artificially create the ephemeral ZNode + _baseAccessor.create("/" + name + "/" + WRITE_LOCK_KEY, new ZNRecord(name), + AccessOption.EPHEMERAL); + + // Test write + try { + _bucketDataAccessor.compressedBucketWrite("/" + name, property); + Assert.fail("Should fail due to an already-existing lock ZNode!"); + } catch (HelixException e) { + // Expect an exception + } + + // Test read + try { + _bucketDataAccessor.compressedBucketRead("/" + name, HelixProperty.class); + Assert.fail("Should fail due to an already-existing lock ZNode!"); + } catch (HelixException e) { + // Expect an exception + } + + // Clean up + _bucketDataAccessor.compressedBucketDelete("/" + name); + } + + private HelixProperty createLargeHelixProperty(String name, int numEntries) { + HelixProperty property = new HelixProperty(name); + for (int i = 0; i < numEntries; i++) { + // Create a random string every time + byte[] arrayKey = new byte[20]; + byte[] arrayVal = new byte[20]; + new Random().nextBytes(arrayKey); + new Random().nextBytes(arrayVal); + String randomStrKey = new String(arrayKey, StandardCharsets.UTF_8); + String randomStrVal = new String(arrayVal, StandardCharsets.UTF_8); + + // Dummy mapField + Map<String, String> mapField = new HashMap<>(); + mapField.put(randomStrKey, randomStrVal); + + property.getRecord().setMapField(randomStrKey, mapField); + } + return property; + } +}