This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 189e9c10c7 Added PropStore.invalidate for immediate consistency on prop change (#5582) 189e9c10c7 is described below commit 189e9c10c779c5930ae87ba757ffbdcb0e9964c8 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed May 28 10:21:59 2025 -0400 Added PropStore.invalidate for immediate consistency on prop change (#5582) ZooPropStore is eventually consistent when a property is changed as it waits for the PropStoreWatcher to receive the event which calls remove on the Caffeine cache. Subsequent calls to ZooPropStore.get will rely on the ZooPropLoader to re-read the information from ZooKeeper and populate the Caffeine cache. ZooBasedConfiguration does not read directly from ZooPropStore and instead reads a snapshot of the data using PropSnapshot. When ZooBasedConfiguration.invalidateCache is called it just marks the PropSnapshot as needing an update, which will re-read from the underlying ZooPropStore. However, if the PropStoreWatcher has not yet fired, then the information read will be the same as the ZooPropStore has not yet been updated. The new PropStore.invalidate method implementation in ZooPropStore removes the node from the Caffeine cache, the same thing that the PropStoreWatcher will do when it receives the event. When a property is changed in the local process, then the cache entry will be immediately invalidated and re-read. The downside to this change is that it will be read twice, when the PropStoreWatcher receives the event. Closes #5541 --- .../accumulo/server/conf/store/PropStore.java | 7 ++ .../server/conf/store/impl/ZooPropStore.java | 5 ++ .../accumulo/server/conf/util/PropSnapshot.java | 1 + .../server/conf/NamespaceConfigurationTest.java | 2 + .../server/conf/SystemConfigurationTest.java | 2 + .../server/conf/TableConfigurationTest.java | 14 ++++ .../server/conf/util/PropSnapshotTest.java | 6 ++ .../test/functional/AccumuloConfigurationIT.java | 87 ++++++++++++++++++++++ 8 files changed, 124 insertions(+) diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java index 3d586e4329..036c7a4e6a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java @@ -129,4 +129,11 @@ public interface PropStore { * @return true if the stored version matches the provided expected version. */ boolean validateDataVersion(PropStoreKey<?> storeKey, long expectedVersion); + + /** + * Invalidate the properties associated with the provided key so that they are re-fetched. + * + * @param storeKey the prop cache key + */ + void invalidate(PropStoreKey<?> storeKey); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java index d25cbd2bd4..6168e18d8c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java @@ -450,4 +450,9 @@ public class ZooPropStore implements PropStore, PropChangeListener { return true; } + @Override + public void invalidate(PropStoreKey<?> storeKey) { + cache.remove(storeKey); + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java b/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java index 3f47cbbcbd..4f2f3fc32f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java @@ -81,6 +81,7 @@ public class PropSnapshot implements PropChangeListener { updateLock.lock(); try { needsUpdate.set(true); + propStore.invalidate(propStoreKey); } finally { updateLock.unlock(); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java index 7fce935b7e..08ebc9225a 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java @@ -81,6 +81,8 @@ public class NamespaceConfigurationTest { Map.of(Property.INSTANCE_SECRET.getKey(), "sekrit"))).anyTimes(); propStore.registerAsListener(eq(nsPropStoreKey), anyObject()); expectLastCall().anyTimes(); + propStore.invalidate(nsPropStoreKey); + expectLastCall().anyTimes(); replay(propStore, context); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java index 17a8a561fa..0b73b98563 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java @@ -117,6 +117,8 @@ public class SystemConfigurationTest { Map.of(GC_PORT.getKey(), "3456", TSERV_SCAN_MAX_OPENFILES.getKey(), "27", TABLE_BLOOM_ENABLED.getKey(), "false", TABLE_BLOOM_SIZE.getKey(), "2048")); expect(propStore.get(eq(sysPropKey))).andReturn(sysUpdateProps).anyTimes(); + propStore.invalidate(sysPropKey); + expectLastCall().atLeastOnce(); replay(propStore); sysConfig.zkChangeEvent(sysPropKey); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java index 9c60673773..e01f2258d4 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java @@ -138,6 +138,8 @@ public class TableConfigurationTest { expect(propStore.get(eq(propKey))) .andReturn(new VersionedProperties(37, Instant.now(), Map.of(p.getKey(), "sekrit"))) .anyTimes(); + propStore.invalidate(propKey); + expectLastCall().atLeastOnce(); replay(propStore); tableConfig.zkChangeEvent(propKey); @@ -159,6 +161,8 @@ public class TableConfigurationTest { .anyTimes(); expect(propStore.get(eq(TablePropKey.of(instanceId, TID)))) .andReturn(new VersionedProperties(Map.of())).anyTimes(); + propStore.invalidate(NamespacePropKey.of(instanceId, NID)); + expectLastCall().atLeastOnce(); replay(propStore); nsConfig.zkChangeEvent(NamespacePropKey.of(instanceId, NID)); @@ -184,6 +188,10 @@ public class TableConfigurationTest { expect(propStore.get(eq(TablePropKey.of(instanceId, TID)))) .andReturn(new VersionedProperties(4, Instant.now(), Map.of("foo", "bar", "tick", "tock"))) .anyTimes(); + propStore.invalidate(TablePropKey.of(instanceId, TID)); + expectLastCall().atLeastOnce(); + propStore.invalidate(NamespacePropKey.of(instanceId, NID)); + expectLastCall().atLeastOnce(); replay(propStore); @@ -221,6 +229,10 @@ public class TableConfigurationTest { expect(propStore.get(eq(TablePropKey.of(instanceId, TID)))).andReturn(new VersionedProperties(4, Instant.now(), Map.of("filter", "not_returned_by_table", "foo", "bar", "tick", "tock"))) .anyTimes(); + propStore.invalidate(TablePropKey.of(instanceId, TID)); + expectLastCall().atLeastOnce(); + propStore.invalidate(NamespacePropKey.of(instanceId, NID)); + expectLastCall().atLeastOnce(); replay(propStore); @@ -257,6 +269,8 @@ public class TableConfigurationTest { .once(); expect(propStore.get(eq(propKey))) .andReturn(new VersionedProperties(39, Instant.now(), Map.of(p.getKey(), "sekrit"))).once(); + propStore.invalidate(propKey); + expectLastCall().atLeastOnce(); replay(propStore); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java index 155db98260..ae218626b0 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/util/PropSnapshotTest.java @@ -70,6 +70,8 @@ class PropSnapshotTest { // after update expect(propStore.get(eq(SystemPropKey.of(instanceId)))) .andReturn(new VersionedProperties(124, Instant.now(), Map.of("k3", "v3"))).once(); + propStore.invalidate(SystemPropKey.of(instanceId)); + expectLastCall().atLeastOnce(); replay(propStore); PropSnapshot snapshot = PropSnapshot.create(SystemPropKey.of(instanceId), propStore); @@ -97,6 +99,8 @@ class PropSnapshotTest { expect(propStore.get(eq(sysPropKey))).andReturn( new VersionedProperties(100, Instant.now(), Map.of(TABLE_BLOOM_ENABLED.getKey(), "false"))) .once(); + propStore.invalidate(SystemPropKey.of(instanceId)); + expectLastCall().atLeastOnce(); replay(propStore); @@ -120,6 +124,8 @@ class PropSnapshotTest { expect(propStore.get(eq(sysPropKey))).andThrow(new IllegalStateException("Fake node delete")) .once(); + propStore.invalidate(sysPropKey); + expectLastCall().atLeastOnce(); replay(propStore); PropSnapshot snapshot = PropSnapshot.create(sysPropKey, propStore); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloConfigurationIT.java new file mode 100644 index 0000000000..4a2865f8dc --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloConfigurationIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class AccumuloConfigurationIT extends SharedMiniClusterBase { + + private static final String fakeProperty = "general.custom.fake.property"; + + private static class ConfigurationCallback implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + cfg.setProperty(fakeProperty, "1"); + } + + } + + @BeforeAll + public static void beforeTests() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new ConfigurationCallback()); + } + + @AfterAll + public static void afterTests() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Test + public void testInvalidation() throws Exception { + + final ServerContext ctx = getCluster().getServerContext(); + String initialThreads = ctx.getConfiguration().get(fakeProperty); + + Timer timer = null; + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + timer = Timer.startNew(); + c.instanceOperations().setProperty(fakeProperty, "4"); + } + + ctx.getConfiguration().invalidateCache(); + + int oldValueReturned = 0; + while (ctx.getConfiguration().get(fakeProperty).equals(initialThreads)) { + oldValueReturned++; + Thread.sleep(25); + } + System.out.println("Configuration returned old value " + oldValueReturned + " times and took " + + timer.elapsed(TimeUnit.MILLISECONDS) + "ms"); + + assertEquals("4", ctx.getConfiguration().get(fakeProperty)); + assertEquals(0, oldValueReturned); + + } + +}