This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit b1f59e6f8669633df47523f7e36e989afd496fa2 Author: Marcos Rico Peng <[email protected]> AuthorDate: Thu Jul 20 13:05:37 2023 -0400 Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548) --------- Co-authored-by: mapeng <[email protected]> --- .../helix/metaclient/impl/zk/ZkMetaClient.java | 6 +- .../zk/TestMultiThreadStressTest/CreatePuppy.java | 78 +++++ .../zk/TestMultiThreadStressTest/DeletePuppy.java | 67 ++++ .../zk/TestMultiThreadStressTest/GetPuppy.java | 68 ++++ .../zk/TestMultiThreadStressTest/SetPuppy.java | 71 +++++ .../TestMultiThreadStressZKClient.java | 352 +++++++++++++++++++++ .../zk/TestMultiThreadStressTest/UpdatePuppy.java | 74 +++++ .../helix/metaclient/puppy/AbstractPuppy.java | 7 +- 8 files changed, 717 insertions(+), 6 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index 7f68ec9ca..691f31cde 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -101,9 +101,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { try { create(key, data, EntryMode.PERSISTENT); } catch (ZkException e) { - throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e); - } catch (Exception e) { - throw new MetaClientException(e); + throw translateZkExceptionToMetaclientException(e); } } @@ -113,7 +111,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { try { _zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode)); } catch (ZkException e) { - throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e); + throw translateZkExceptionToMetaclientException(e); } catch (KeeperException e) { throw new MetaClientException(e); } diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java new file mode 100644 index 000000000..3e28df06b --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java @@ -0,0 +1,78 @@ +package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; +import org.apache.helix.metaclient.puppy.AbstractPuppy; +import org.apache.helix.metaclient.puppy.PuppySpec; + +import java.util.Random; + +public class CreatePuppy extends AbstractPuppy { + + private final Random _random; + private final String _parentPath = "/test"; + + public CreatePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) { + super(metaclient, puppySpec); + _random = new Random(); + } + + @Override + protected void bark() { + // Implement the chaos logic for creating nodes + int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths()); + if (shouldIntroduceError()) { + try { + // Simulate an error by creating an invalid path + _metaclient.create("invalid", "test"); + } catch (IllegalArgumentException e) { // Catch invalid exception + System.out.println(Thread.currentThread().getName() + " tried to create an invalid path" + " at time: " + System.currentTimeMillis()); + // Expected exception + } + } else { + // Normal behavior - create a new node + try { + System.out.println(Thread.currentThread().getName() + " is attempting to create node: " + randomNumber + " at time: " + System.currentTimeMillis()); + _metaclient.create(_parentPath + "/" + randomNumber,"test"); + System.out.println(Thread.currentThread().getName() + " successfully created node " + randomNumber + " at time: " + System.currentTimeMillis()); + _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1); + } catch (MetaClientNodeExistsException e) { + // Expected exception + System.out.println(Thread.currentThread().getName() + " failed to create node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it already exists"); + } + } + } + + @Override + protected void cleanup() { + // Implement the recovery logic by deleting the created documents + _metaclient.recursiveDelete(_parentPath); + } + + private boolean shouldIntroduceError() { + float randomValue = _random.nextFloat(); + return randomValue < _puppySpec.getErrorRate(); + } +} + + + diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java new file mode 100644 index 000000000..e0e1b7b5c --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java @@ -0,0 +1,67 @@ +package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.puppy.AbstractPuppy; +import org.apache.helix.metaclient.puppy.PuppySpec; + +import java.util.Random; + +public class DeletePuppy extends AbstractPuppy { + + private final Random _random; + private final String _parentPath = "/test"; + + public DeletePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) { + super(metaclient, puppySpec); + _random = new Random(); + } + + @Override + protected void bark() { + int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths()); + if (shouldIntroduceError()) { + try { + _metaclient.delete("invalid"); + _unhandledErrorCounter++; + } catch (IllegalArgumentException e) { + System.out.println(Thread.currentThread().getName() + " intentionally deleted an invalid path" + " at time: " + System.currentTimeMillis() ); + } + } else { + System.out.println(Thread.currentThread().getName() + " is attempting to delete node: " + randomNumber + " at time: " + System.currentTimeMillis()); + if (_metaclient.delete(_parentPath + "/" + randomNumber)) { + System.out.println(Thread.currentThread().getName() + " successfully deleted node " + randomNumber + " at time: " + System.currentTimeMillis()); + _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1); + } else { + System.out.println(Thread.currentThread().getName() + " failed to delete node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist"); + } + } + } + + @Override + protected void cleanup() { + _metaclient.recursiveDelete(_parentPath); + } + + private boolean shouldIntroduceError() { + return _random.nextFloat() < _puppySpec.getErrorRate(); + } +} \ No newline at end of file diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java new file mode 100644 index 000000000..fe24b2bd3 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java @@ -0,0 +1,68 @@ +package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.puppy.AbstractPuppy; +import org.apache.helix.metaclient.puppy.PuppySpec; + +import java.util.Objects; +import java.util.Random; + +public class GetPuppy extends AbstractPuppy { + + private final Random _random; + private final String _parentPath = "/test"; + + public GetPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) { + super(metaclient, puppySpec); + _random = new Random(); + } + + @Override + protected void bark() { + int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths()); + if (shouldIntroduceError()) { + try { + _metaclient.get("invalid"); + _unhandledErrorCounter++; + } catch (IllegalArgumentException e) { + System.out.println(Thread.currentThread().getName() + " intentionally tried to read an invalid path" + " at time: " + System.currentTimeMillis()); + } + } else { + System.out.println(Thread.currentThread().getName() + " is attempting to read node: " + randomNumber + " at time: " + System.currentTimeMillis()); + String nodeValue = _metaclient.get(_parentPath + "/" + randomNumber); + if (Objects.equals(nodeValue, null)) { + System.out.println(Thread.currentThread().getName() + " failed to read node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist"); + } else { + System.out.println(Thread.currentThread().getName() + " successfully read node " + randomNumber + " at time: " + System.currentTimeMillis()); + } + } + } + + @Override + protected void cleanup() { + _metaclient.recursiveDelete(_parentPath); + } + + private boolean shouldIntroduceError() { + return _random.nextFloat() < _puppySpec.getErrorRate(); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java new file mode 100644 index 000000000..c0de4ece7 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java @@ -0,0 +1,71 @@ +package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.exception.MetaClientNoNodeException; +import org.apache.helix.metaclient.puppy.AbstractPuppy; +import org.apache.helix.metaclient.puppy.PuppySpec; + +import java.util.Random; + +public class SetPuppy extends AbstractPuppy { + + private final Random _random; + private final String _parentPath = "/test"; + + public SetPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) { + super(metaclient, puppySpec); + _random = new Random(); + } + + @Override + protected void bark() { + int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths()); + if (shouldIntroduceError()) { + try { + _metaclient.set("invalid", "test", -1); + } catch (IllegalArgumentException e) { + System.out.println(Thread.currentThread().getName() + " intentionally called set on an invalid path" + " at time: " + System.currentTimeMillis()); + } + } else { + try { + System.out.println(Thread.currentThread().getName() + " is attempting to set node: " + randomNumber + " at time: " + System.currentTimeMillis()); + _metaclient.set(_parentPath + "/" + randomNumber, "test", -1); + _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1); + System.out.println( + Thread.currentThread().getName() + " successfully set node " + randomNumber + " at time: " + + System.currentTimeMillis()); + } catch (MetaClientNoNodeException e) { + System.out.println(Thread.currentThread().getName() + " failed to set node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist"); + } + } + } + + @Override + protected void cleanup() { + _metaclient.recursiveDelete(_parentPath); + } + + private boolean shouldIntroduceError() { + float randomValue = _random.nextFloat(); + return randomValue < _puppySpec.getErrorRate(); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java new file mode 100644 index 000000000..5ee026bc1 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java @@ -0,0 +1,352 @@ +package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.ChildChangeListener; +import org.apache.helix.metaclient.impl.zk.ZkMetaClient; +import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; +import org.apache.helix.metaclient.puppy.ExecDelay; +import org.apache.helix.metaclient.puppy.PuppyManager; +import org.apache.helix.metaclient.puppy.PuppyMode; +import org.apache.helix.metaclient.puppy.PuppySpec; +import org.apache.helix.metaclient.puppy.AbstractPuppy; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestMultiThreadStressZKClient extends ZkMetaClientTestBase { + + private ZkMetaClient<String> _zkMetaClient; + private final String zkParentKey = "/test"; + + private final long TIMEOUT = 60; // The desired timeout duration of tests in seconds + + @BeforeTest + private void setUp() { + this._zkMetaClient = createZkMetaClient(); + this._zkMetaClient.connect(); + } + + @Test + public void testCreatePuppy() { + _zkMetaClient.create(zkParentKey, "test"); + + PuppySpec puppySpec = new org.apache.helix.metaclient.puppy.PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + CreatePuppy createPuppy2 = new CreatePuppy(_zkMetaClient, puppySpec); + CreatePuppy createPuppy3 = new CreatePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(createPuppy2); + puppyManager.addPuppy(createPuppy3); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + @Test + public void testDeletePuppy() { + _zkMetaClient.create(zkParentKey, "test"); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(deletePuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + @Test + public void testGetPuppy() { + _zkMetaClient.create(zkParentKey, "test"); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(getPuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + @Test + public void testSetPuppy() { + _zkMetaClient.create(zkParentKey, "test"); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(setPuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + @Test + public void testUpdatePuppy() { + _zkMetaClient.create(zkParentKey, "test"); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(updatePuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + @Test + public void testCrudPuppies() { + _zkMetaClient.create(zkParentKey, "test"); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec); + DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec); + SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec); + UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(getPuppy); + puppyManager.addPuppy(deletePuppy); + puppyManager.addPuppy(setPuppy); + puppyManager.addPuppy(updatePuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + + @Test + public void testBasicParentListenerPuppy() { + _zkMetaClient.create(zkParentKey, "test"); + AtomicInteger globalChildChangeCounter = new AtomicInteger(); + ChildChangeListener childChangeListener = (changedPath, changeType) -> { + globalChildChangeCounter.addAndGet(1); + System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + globalChildChangeCounter.get()); + }; + + _zkMetaClient.subscribeChildChanges(zkParentKey, childChangeListener, false); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, globalChildChangeCounter); + + // cleanup + _zkMetaClient.unsubscribeChildChanges(zkParentKey, childChangeListener); + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + @Test + public void testComplexParentListenerPuppy() { + _zkMetaClient.create(zkParentKey, "test"); + // Global counter for all child changes + AtomicInteger globalChildChangeCounter = new AtomicInteger(); + ChildChangeListener childChangeListener = (changedPath, changeType) -> { + globalChildChangeCounter.addAndGet(1); + System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + globalChildChangeCounter.get()); + }; + + + _zkMetaClient.subscribeChildChanges(zkParentKey, childChangeListener, false); + + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec); + DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec); + SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec); + UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(getPuppy); + puppyManager.addPuppy(deletePuppy); + puppyManager.addPuppy(setPuppy); + puppyManager.addPuppy(updatePuppy); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, globalChildChangeCounter); + + // cleanup + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + _zkMetaClient.unsubscribeChildChanges(zkParentKey, childChangeListener); + _zkMetaClient.delete(zkParentKey); + } + + + @Test + public void testChildListenerPuppy() { + _zkMetaClient.create(zkParentKey, "test"); + // Setting num diff paths to 3 until we find a better way of scaling listeners. + PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 3); + CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec); + GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec); + DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec); + SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec); + UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec); + + PuppyManager puppyManager = new PuppyManager(); + puppyManager.addPuppy(createPuppy); + puppyManager.addPuppy(getPuppy); + puppyManager.addPuppy(deletePuppy); + puppyManager.addPuppy(setPuppy); + puppyManager.addPuppy(updatePuppy); + + // Create a child listener for each child defined in number diff paths in puppyspec. + // TODO: Make this a parameter for a loop. + AtomicInteger childChangeCounter0 = new AtomicInteger(); + ChildChangeListener childChangeListener0 = (changedPath, changeType) -> { + childChangeCounter0.addAndGet(1); + System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter0.get()); + }; + _zkMetaClient.subscribeChildChanges("/test/0", childChangeListener0, false); + + AtomicInteger childChangeCounter1 = new AtomicInteger(); + ChildChangeListener childChangeListener1 = (changedPath, changeType) -> { + childChangeCounter1.addAndGet(1); + System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter1.get()); + }; + _zkMetaClient.subscribeChildChanges("/test/1", childChangeListener1, false); + + AtomicInteger childChangeCounter2 = new AtomicInteger(); + ChildChangeListener childChangeListener2 = (changedPath, changeType) -> { + childChangeCounter2.addAndGet(1); + System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter2.get()); + }; + _zkMetaClient.subscribeChildChanges("/test/2", childChangeListener2, false); + + puppyManager.start(TIMEOUT); + + assertNoExceptions(puppyManager, null); + + // Add all event changes from all puppies and compare with child change listener + // Inner merged by path + Map<String, Integer> mergedEventChangeCounterMap = new HashMap<>(); + for (AbstractPuppy puppy : puppyManager.getPuppies()) { + puppy._eventChangeCounterMap.forEach((key, value) -> { + if (mergedEventChangeCounterMap.containsKey(key)) { + mergedEventChangeCounterMap.put(key, mergedEventChangeCounterMap.get(key) + value); + } else { + mergedEventChangeCounterMap.put(key, value); + } + }); + } + + System.out.println("Merged event change counter map: " + mergedEventChangeCounterMap); + System.out.println("Child change counter 0: " + childChangeCounter0); + System.out.println("Child change counter 1: " + childChangeCounter1); + System.out.println("Child change counter 2: " + childChangeCounter2); + Assert.assertEquals(childChangeCounter0.get(), mergedEventChangeCounterMap.getOrDefault("0", 0).intValue()); + Assert.assertEquals(childChangeCounter1.get(), mergedEventChangeCounterMap.getOrDefault("1", 0).intValue()); + Assert.assertEquals(childChangeCounter2.get(), mergedEventChangeCounterMap.getOrDefault("2", 0).intValue()); + + // cleanup + _zkMetaClient.unsubscribeChildChanges("/test/0", childChangeListener0); + _zkMetaClient.unsubscribeChildChanges("/test/1", childChangeListener1); + _zkMetaClient.unsubscribeChildChanges("/test/2", childChangeListener2); + _zkMetaClient.recursiveDelete(zkParentKey); + Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0); + } + + private void assertNoExceptions(PuppyManager puppyManager, AtomicInteger globalChangeCounter) { + int totalUnhandledErrors = 0; + int totalEventChanges = 0; + + // Add all change counters and compare with event change listener + for (AbstractPuppy puppy : puppyManager.getPuppies()) { + AtomicInteger totalHandledErrors = new AtomicInteger(); + puppy._eventChangeCounterMap.forEach((key, value) -> { + totalHandledErrors.addAndGet(value); + }); + + System.out.println("Change counter: " + totalHandledErrors + " for " + puppy.getClass()); + System.out.println("Error counter: " + puppy._unhandledErrorCounter + " for " + puppy.getClass()); + totalUnhandledErrors += puppy._unhandledErrorCounter; + totalEventChanges += totalHandledErrors.get(); + } + + // Assert no unhandled (unexpected) exceptions and that the child change listener placed on + // test parent node (/test) caught all successful changes that were recorded by each puppy + Assert.assertEquals(totalUnhandledErrors, 0); + + // Assert that the global change counter matches the total number of events recorded by each puppy + if (globalChangeCounter != null) { + Assert.assertEquals(totalEventChanges, globalChangeCounter.get()); + } + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java new file mode 100644 index 000000000..721d507eb --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java @@ -0,0 +1,74 @@ +package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.exception.MetaClientNoNodeException; +import org.apache.helix.metaclient.puppy.AbstractPuppy; +import org.apache.helix.metaclient.puppy.PuppySpec; + +import java.util.Random; + +public class UpdatePuppy extends AbstractPuppy { + + private final Random _random; + private final String _parentPath = "/test"; + + public UpdatePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) { + super(metaclient, puppySpec); + _random = new Random(); + } + + @Override + protected void bark() throws Exception { + int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths()); + if (shouldIntroduceError()) { + try { + _metaclient.update("invalid", (data) -> "foo"); + } catch (IllegalArgumentException e) { + System.out.println(Thread.currentThread().getName() + " intentionally tried to update an invalid path" + " at time: " + System.currentTimeMillis()); + } + } else { + try { + System.out.println(Thread.currentThread().getName() + " is attempting to update node: " + randomNumber + " at time: " + System.currentTimeMillis()); + _metaclient.update(_parentPath + "/" + randomNumber, (data) -> "foo"); + _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1); + System.out.println(Thread.currentThread().getName() + " successfully updated node " + randomNumber + " at time: " + + System.currentTimeMillis()); + } catch (MetaClientNoNodeException e) { + System.out.println(Thread.currentThread().getName() + " failed to update node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist"); + } catch (IllegalArgumentException e) { + if (!e.getMessage().equals("Can not subscribe one time watcher when ZkClient is using PersistWatcher")) { + throw e; + } + } + } + } + + @Override + protected void cleanup() { + _metaclient.recursiveDelete(_parentPath); + } + + private boolean shouldIntroduceError() { + float randomValue = _random.nextFloat(); + return randomValue < _puppySpec.getErrorRate(); + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java index 85137fc17..bfbbb915d 100644 --- a/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java +++ b/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java @@ -29,8 +29,9 @@ public abstract class AbstractPuppy implements Runnable { protected MetaClientInterface<String> _metaclient; protected PuppySpec _puppySpec; - public HashMap<String, Integer> _eventChangeCounterMap; - protected int _unhandledErrorCounter; + public final HashMap<String, Integer> _eventChangeCounterMap; + public int _unhandledErrorCounter; + public AbstractPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) { _metaclient = metaclient; @@ -69,6 +70,8 @@ public abstract class AbstractPuppy implements Runnable { try { Thread.sleep(getPuppySpec().getExecDelay().getNextDelay()); } catch (InterruptedException e) { + cleanup(); + break; // Handle interruption if necessary } }
