This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8c2059c78130f8138d6295bef27d76ac5a413aaf Author: Hunter Lee <[email protected]> AuthorDate: Mon Jul 27 18:36:39 2020 -0700 Implement throttling for routing data update on cache miss This commit implements throttling for routing data update by using a timestamp for last time the cache was reset in RoutingDataManager. It defines a default interval (5 seconds) but makes this interval configurable by way of System Properties config. --- ...PropertyKeys.java => RoutingDataConstants.java} | 14 ++--- .../constant/RoutingSystemPropertyKeys.java | 5 ++ .../zookeeper/impl/client/FederatedZkClient.java | 38 ++++++++++++ .../zookeeper/routing/RoutingDataManager.java | 12 ++++ .../impl/client/TestFederatedZkClient.java | 70 +++++++++++++++++++++- 5 files changed, 128 insertions(+), 11 deletions(-) diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java similarity index 66% copy from zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java copy to zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java index a57075b..164c543 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingDataConstants.java @@ -19,15 +19,13 @@ package org.apache.helix.zookeeper.constant; * under the License. */ -/** - * This class contains various routing-related system property keys for multi-zk clients. - */ -public class RoutingSystemPropertyKeys { +public class RoutingDataConstants { /** - * If enabled, FederatedZkClient (multiZkClient) will invalidate the cached routing data and - * re-read the routing data from the routing data source upon ZK path sharding key cache miss. + * Default interval that defines how frequently RoutingDataManager's routing data should be + * updated from the routing data source. This exists to apply throttling to the rate at which + * the ZkClient pulls routing data from the routing data source to avoid overloading the routing + * data source. */ - public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS = - "update.routing.data.on.cache.miss.enabled"; + public static final long DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS = 5 * 1000L; // 5 seconds } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java index a57075b..e22ad08 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/RoutingSystemPropertyKeys.java @@ -30,4 +30,9 @@ public class RoutingSystemPropertyKeys { */ public static final String UPDATE_ROUTING_DATA_ON_CACHE_MISS = "update.routing.data.on.cache.miss.enabled"; + + /** + * The interval to use between routing data updates from the routing data source. + */ + public static final String ROUTING_DATA_UPDATE_INTERVAL_MS = "routing.data.update.interval.ms"; } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 4354537..dc55d53 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -30,6 +30,7 @@ import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.constant.RoutingDataConstants; import org.apache.helix.zookeeper.constant.RoutingSystemPropertyKeys; import org.apache.helix.zookeeper.exception.MultiZkException; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; @@ -86,6 +87,7 @@ public class FederatedZkClient implements RealmAwareZkClient { private PathBasedZkSerializer _pathBasedZkSerializer; private final boolean _routingDataUpdateOnCacheMissEnabled = Boolean.parseBoolean( System.getProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS)); + private long _routingDataUpdateInterval; // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection. public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, @@ -102,6 +104,7 @@ public class FederatedZkClient implements RealmAwareZkClient { _clientConfig = clientConfig; _pathBasedZkSerializer = clientConfig.getZkSerializer(); _zkRealmToZkClientMap = new ConcurrentHashMap<>(); + getRoutingDataUpdateInterval(); } @Override @@ -587,6 +590,11 @@ public class FederatedZkClient implements RealmAwareZkClient { try { zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path); } catch (NoSuchElementException e4) { + if (shouldThrottleRead()) { + // If routing data update from routing data source has taken place recently, + // then just skip the update and throw the exception + throw e4; + } // Try 2) Reset RoutingDataManager and re-read the routing data from routing data // source via I/O. Since RoutingDataManager's cache doesn't have it either, so we // synchronize on all threads by locking on FederatedZkClient.class. @@ -626,4 +634,34 @@ public class FederatedZkClient implements RealmAwareZkClient { + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY + " to create a dedicated RealmAwareZkClient for this operation."); } + + /** + * Resolves the routing data update interval value from System Properties. + */ + private void getRoutingDataUpdateInterval() { + try { + _routingDataUpdateInterval = Long.parseLong( + System.getProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS)); + if (_routingDataUpdateInterval < 0) { + LOG.warn("FederatedZkClient::shouldThrottleRead(): invalid value: {} given for " + + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) instead!", + _routingDataUpdateInterval); + _routingDataUpdateInterval = RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS; + } + } catch (NumberFormatException e) { + LOG.warn("FederatedZkClient::shouldThrottleRead(): failed to parse " + + "ROUTING_DATA_UPDATE_INTERVAL_MS, using the default value (5 sec) instead!", e); + _routingDataUpdateInterval = RoutingDataConstants.DEFAULT_ROUTING_DATA_UPDATE_INTERVAL_MS; + } + } + + /** + * Return whether the read request to routing data source should be throttled using the default + * routing data update interval. + * @return + */ + private boolean shouldThrottleRead() { + return System.currentTimeMillis() - RoutingDataManager.getInstance().getLastResetTimestamp() + < _routingDataUpdateInterval; + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java index 6df9616..853bd5c 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/routing/RoutingDataManager.java @@ -51,6 +51,9 @@ public class RoutingDataManager { private final Map<String, MetadataStoreRoutingData> _metadataStoreRoutingDataMap = new ConcurrentHashMap<>(); + // Tracks the time at which reset() was called last. Used to throttle reset() + private volatile long _lastResetTimestamp; + // Singleton instance private static RoutingDataManager _instance; @@ -164,6 +167,15 @@ public class RoutingDataManager { _metadataStoreRoutingDataMap.clear(); _defaultMsdsEndpoint = System.getProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY); + _lastResetTimestamp = System.currentTimeMillis(); + } + + /** + * Returns the timestamp for the last reset(). + * @return + */ + public long getLastResetTimestamp() { + return _lastResetTimestamp; } /** diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java index 93e5892..e201905 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java @@ -279,6 +279,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase { throws IOException, InvalidRoutingDataException { // Enable routing data update upon cache miss System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true"); + // Set the routing data update interval to 0 so there's no delay in testing + System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, "0"); RoutingDataManager.getInstance().getMetadataStoreRoutingData(); _msdsServer.stopServer(); @@ -375,7 +377,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase { // Shut down MSDS _msdsServer.stopServer(); // Disable System property - System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "false"); + System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS); + System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS); } /** @@ -402,6 +405,8 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase { // Enable routing data update upon cache miss System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true"); + // Set the routing data update interval to 0 so there's no delay in testing + System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, "0"); RoutingDataManager.getInstance().reset(); RoutingDataManager.getInstance().getMetadataStoreRoutingData(RoutingDataReaderType.ZK, zkRealm); @@ -496,7 +501,66 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase { zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH); zkClient.close(); // Disable System property - System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "false"); + System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS); + System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS); + } + + /** + * Test that throttle based on last reset timestamp works correctly. Here, we use ZK as the + * routing data source. + * Test scenario: set the throttle value to a high value and check that routing data update from + * the routing data source does NOT happen (because it would be throttled). + */ + @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK") + public void testRoutingDataUpdateThrottle() throws InvalidRoutingDataException { + // Call reset to set the last reset() timestamp in RoutingDataManager + RoutingDataManager.getInstance().reset(); + + // Set up routing data in ZK with empty sharding key list + String zkRealm = "localhost:2127"; + String newShardingKey = "/throttle"; + ZkClient zkClient = + new ZkClient.Builder().setZkServer(zkRealm).setZkSerializer(new ZNRecordSerializer()) + .build(); + zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH, null, CreateMode.PERSISTENT); + ZNRecord zkRealmRecord = new ZNRecord(zkRealm); + zkRealmRecord.setListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY, + new ArrayList<>(TestConstants.TEST_KEY_LIST_1)); + zkClient.create(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm, zkRealmRecord, + CreateMode.PERSISTENT); + + // Enable routing data update upon cache miss + System.setProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS, "true"); + // Set the throttle value to a very long value + System.setProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS, + String.valueOf(Integer.MAX_VALUE)); + + // Create a new FederatedZkClient, whose _routingDataUpdateInterval should be MAX_VALUE + FederatedZkClient federatedZkClient = new FederatedZkClient( + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder() + .setRoutingDataSourceType(RoutingDataReaderType.ZK.name()) + .setRoutingDataSourceEndpoint(zkRealm).build(), + new RealmAwareZkClient.RealmAwareZkClientConfig()); + + // Add newShardingKey to ZK's routing data + zkRealmRecord.getListField(MetadataStoreRoutingConstants.ZNRECORD_LIST_FIELD_KEY) + .add(newShardingKey); + zkClient + .writeData(MetadataStoreRoutingConstants.ROUTING_DATA_PATH + "/" + zkRealm, zkRealmRecord); + + try { + Assert.assertFalse(federatedZkClient.exists(newShardingKey)); + Assert.fail("NoSuchElementException expected!"); + } catch (NoSuchElementException e) { + // Expected because it should not read from the routing data source because of the throttle + } + + // Clean up + zkClient.deleteRecursively(MetadataStoreRoutingConstants.ROUTING_DATA_PATH); + zkClient.close(); + federatedZkClient.close(); + System.clearProperty(RoutingSystemPropertyKeys.UPDATE_ROUTING_DATA_ON_CACHE_MISS); + System.clearProperty(RoutingSystemPropertyKeys.ROUTING_DATA_UPDATE_INTERVAL_MS); } /* @@ -504,7 +568,7 @@ public class TestFederatedZkClient extends RealmAwareZkClientTestBase { * TODO: test that all raw zkClients are closed after FederatedZkClient close() is called. This * could help avoid ZkClient leakage. */ - @Test(dependsOnMethods = "testUpdateRoutingDataOnCacheMissZK") + @Test(dependsOnMethods = "testRoutingDataUpdateThrottle") public void testClose() { Assert.assertFalse(_realmAwareZkClient.isClosed());
