This is an automated email from the ASF dual-hosted git repository. omalley pushed a commit to branch branch-3.3.5 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3.5 by this push: new 6aacb3557f2 HDFS-16844: RBF: Adds resilancy when StateStore gets exceptions. (#5138) 6aacb3557f2 is described below commit 6aacb3557f2ae1d3f34447e60a5e19bbd95d7abb Author: Owen O'Malley <oomal...@linkedin.com> AuthorDate: Fri Nov 18 17:24:08 2022 +0000 HDFS-16844: RBF: Adds resilancy when StateStore gets exceptions. (#5138) Allows the StateStore to stay up when there are errors reading the data. --- .../resolver/MembershipNamenodeResolver.java | 8 +- .../federation/resolver/MountTableResolver.java | 4 +- .../server/federation/store/CachedRecordStore.java | 3 +- .../federation/store/impl/MembershipStoreImpl.java | 4 +- .../store/records/MockStateStoreDriver.java | 139 +++++++++++++++++++++ .../federation/store/records/TestRouterState.java | 53 +++++++- 6 files changed, 203 insertions(+), 8 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 66290039782..9d2dd1651a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -123,9 +123,13 @@ public class MembershipNamenodeResolver // Our cache depends on the store, update it first try { MembershipStore membership = getMembershipStore(); - membership.loadCache(force); + if (!membership.loadCache(force)) { + return false; + } DisabledNameserviceStore disabled = getDisabledNameserviceStore(); - disabled.loadCache(force); + if (!disabled.loadCache(force)) { + return false; + } } catch (IOException e) { LOG.error("Cannot update membership from the State Store", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java index 797006ab1de..9d4804fb928 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -346,7 +346,9 @@ public class MountTableResolver try { // Our cache depends on the store, update it first MountTableStore mountTable = this.getMountTableStore(); - mountTable.loadCache(force); + if (!mountTable.loadCache(force)) { + return false; + } GetMountTableEntriesRequest request = GetMountTableEntriesRequest.newInstance("/"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 7b28c03a529..613d8a78038 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -100,7 +100,7 @@ public abstract class CachedRecordStore<R extends BaseRecord> * @throws StateStoreUnavailableException If the cache is not initialized. */ private void checkCacheAvailable() throws StateStoreUnavailableException { - if (!this.initialized) { + if (!getDriver().isDriverReady() || !this.initialized) { throw new StateStoreUnavailableException( "Cached State Store not initialized, " + getRecordClass().getSimpleName() + " records not valid"); @@ -125,7 +125,6 @@ public abstract class CachedRecordStore<R extends BaseRecord> } catch (IOException e) { LOG.error("Cannot get \"{}\" records from the State Store", getRecordClass().getSimpleName()); - this.initialized = false; return false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java index a63a0f3b3ab..5d22b77afe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MembershipStoreImpl.java @@ -185,7 +185,9 @@ public class MembershipStoreImpl @Override public boolean loadCache(boolean force) throws IOException { - super.loadCache(force); + if (!super.loadCache(force)) { + return false; + } // Update local cache atomically cacheWriteLock.lock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java new file mode 100644 index 00000000000..57185a0a600 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.records; + +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreBaseImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A mock StateStoreDriver that runs in memory that can force IOExceptions + * upon demand. + */ +public class MockStateStoreDriver extends StateStoreBaseImpl { + private boolean giveErrors = false; + private boolean initialized = false; + private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>(); + + @Override + public boolean initDriver() { + initialized = true; + return true; + } + + @Override + public <T extends BaseRecord> boolean initRecordStorage(String className, + Class<T> clazz) { + return true; + } + + @Override + public boolean isDriverReady() { + return initialized; + } + + @Override + public void close() throws Exception { + valueMap.clear(); + initialized = false; + } + + /** + * Should this object throw an IOException on each following call? + * @param value should we throw errors? + */ + public void setGiveErrors(boolean value) { + giveErrors = value; + } + + /** + * Check to see if this StateStore should throw IOException on each call. + * @throws IOException thrown if giveErrors has been set + */ + private void checkErrors() throws IOException { + if (giveErrors) { + throw new IOException("Induced errors"); + } + } + + @Override + @SuppressWarnings("unchecked") + public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException { + checkErrors(); + Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz)); + List<T> results = + map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>(); + return new QueryResult<>(results, System.currentTimeMillis()); + } + + @Override + public <T extends BaseRecord> boolean putAll(List<T> records, + boolean allowUpdate, + boolean errorIfExists) + throws IOException { + checkErrors(); + for (T record : records) { + Map<String, BaseRecord> map = + valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()), + k -> new HashMap<>()); + String key = record.getPrimaryKey(); + BaseRecord oldRecord = map.get(key); + if (oldRecord == null || allowUpdate) { + map.put(key, record); + } else if (errorIfExists) { + throw new IOException("Record already exists for " + record.getClass() + + ": " + key); + } + } + return true; + } + + @Override + public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException { + checkErrors(); + return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null; + } + + @Override + @SuppressWarnings("unchecked") + public <T extends BaseRecord> int remove(Class<T> clazz, + Query<T> query) + throws IOException { + checkErrors(); + int result = 0; + Map<String, BaseRecord> map = + valueMap.get(StateStoreUtils.getRecordName(clazz)); + if (map != null) { + for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) { + BaseRecord record = itr.next(); + if (query.matches((T) record)) { + itr.remove(); + result += 1; + } + } + } + return result; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java index dfe2bc98bf4..4289999429b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,16 @@ package org.apache.hadoop.hdfs.server.federation.store.records; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.junit.Test; @@ -40,7 +48,7 @@ public class TestRouterState { private static final RouterServiceState STATE = RouterServiceState.RUNNING; - private RouterState generateRecord() throws IOException { + private RouterState generateRecord() { RouterState record = RouterState.newInstance(ADDRESS, START_TIME, STATE); record.setVersion(VERSION); record.setCompileInfo(COMPILE_INFO); @@ -82,4 +90,45 @@ public class TestRouterState { validateRecord(newRecord); } + + @Test + public void testStateStoreResilience() throws Exception { + StateStoreService service = new StateStoreService(); + Configuration conf = new Configuration(); + conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + MockStateStoreDriver.class, + StateStoreDriver.class); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false); + service.init(conf); + MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver(); + // Add two records for block1 + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha1", "cluster1", "block1", "rpc1", + "service1", "lifeline1", "https", "nn01", + FederationNamenodeServiceState.ACTIVE, false), false, false); + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha2", "cluster1", "block1", "rpc2", + "service2", "lifeline2", "https", "nn02", + FederationNamenodeServiceState.STANDBY, false), false, false); + // load the cache + service.loadDriver(); + MembershipNamenodeResolver resolver = new MembershipNamenodeResolver(conf, service); + service.refreshCaches(true); + + // look up block1 + List<? extends FederationNamenodeContext> result = + resolver.getNamenodesForBlockPoolId("block1"); + assertEquals(2, result.size()); + + // cause io errors and then reload the cache + driver.setGiveErrors(true); + long previousUpdate = service.getCacheUpdateTime(); + service.refreshCaches(true); + assertEquals(previousUpdate, service.getCacheUpdateTime()); + + // make sure the old cache is still there + result = resolver.getNamenodesForBlockPoolId("block1"); + assertEquals(2, result.size()); + service.stop(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org