This is an automated email from the ASF dual-hosted git repository. omalley pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 03471a736ce HDFS-16851: RBF: Add a utility to dump the StateStore. (#5155) 03471a736ce is described below commit 03471a736ce53efeb7aacbf6d311b52032b12171 Author: Owen O'Malley <oomal...@linkedin.com> AuthorDate: Tue Nov 29 22:12:35 2022 +0000 HDFS-16851: RBF: Add a utility to dump the StateStore. (#5155) --- .../server/federation/store/StateStoreService.java | 9 +++ .../hadoop/hdfs/tools/federation/RouterAdmin.java | 74 +++++++++++++++++++++- .../src/site/markdown/HDFSRouterFederation.md | 11 ++++ .../federation/router/TestRouterAdminCLI.java | 73 ++++++++++++++++++++- .../store/records/MockStateStoreDriver.java | 19 ++++-- .../federation/store/records/TestRouterState.java | 1 + 6 files changed, 177 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 201c7a325f1..77939799e72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -272,6 +272,15 @@ public class StateStoreService extends CompositeService { return null; } + /** + * Get the list of all RecordStores. + * @return a list of each RecordStore. + */ + @SuppressWarnings("unchecked") + public <T extends RecordStore<? extends BaseRecord>> List<T> getRecordStores() { + return new ArrayList<>((Collection<T>) recordStores.values()); + } + /** * List of records supported by this State Store. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java index d7fcf862fb6..3ecb4c2caba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.tools.federation; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; @@ -26,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.regex.Pattern; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -46,6 +48,10 @@ import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RouterClient; import org.apache.hadoop.hdfs.server.federation.router.RouterQuotaUsage; import org.apache.hadoop.hdfs.server.federation.router.RouterStateManager; +import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore; +import org.apache.hadoop.hdfs.server.federation.store.RecordStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest; @@ -70,7 +76,9 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.impl.pb.PBRecord; import org.apache.hadoop.ipc.ProtobufRpcEngine2; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RefreshResponse; @@ -97,6 +105,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.Quota.andByStorage public class RouterAdmin extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(RouterAdmin.class); + private static final String DUMP_COMMAND = "-dumpState"; private RouterClient client; @@ -133,7 +142,7 @@ public class RouterAdmin extends Configured implements Tool { String[] commands = {"-add", "-update", "-rm", "-ls", "-getDestination", "-setQuota", "-setStorageTypeQuota", "-clrQuota", "-clrStorageTypeQuota", - "-safemode", "-nameservice", "-getDisabledNameservices", + DUMP_COMMAND, "-safemode", "-nameservice", "-getDisabledNameservices", "-refresh", "-refreshRouterArgs", "-refreshSuperUserGroupsConfiguration", "-refreshCallQueue"}; StringBuilder usage = new StringBuilder(); @@ -187,6 +196,8 @@ public class RouterAdmin extends Configured implements Tool { return "\t[-refreshSuperUserGroupsConfiguration]"; } else if (cmd.equals("-refreshCallQueue")) { return "\t[-refreshCallQueue]"; + } else if (cmd.equals(DUMP_COMMAND)) { + return "\t[" + DUMP_COMMAND + "]"; } return getUsage(null); } @@ -224,7 +235,8 @@ public class RouterAdmin extends Configured implements Tool { if (arg.length > 1) { throw new IllegalArgumentException("No arguments allowed"); } - } else if (arg[0].equals("-refreshCallQueue")) { + } else if (arg[0].equals("-refreshCallQueue") || + arg[0].equals(DUMP_COMMAND)) { if (arg.length > 1) { throw new IllegalArgumentException("No arguments allowed"); } @@ -286,6 +298,15 @@ public class RouterAdmin extends Configured implements Tool { return true; } + /** + * Does this command run in the local process? + * @param cmd the string of the command + * @return is this a local command? + */ + boolean isLocalCommand(String cmd) { + return DUMP_COMMAND.equals(cmd); + } + @Override public int run(String[] argv) throws Exception { if (argv.length < 1) { @@ -303,6 +324,10 @@ public class RouterAdmin extends Configured implements Tool { System.err.println("Not enough parameters specificed for cmd " + cmd); printUsage(cmd); return exitCode; + } else if (isLocalCommand(argv[0])) { + if (DUMP_COMMAND.equals(argv[0])) { + return dumpStateStore(getConf(), System.out) ? 0 : -1; + } } String address = null; // Initialize RouterClient @@ -1301,6 +1326,49 @@ public class RouterAdmin extends Configured implements Tool { return returnCode; } + /** + * Dumps the contents of the StateStore to stdout. + * @return true if it was successful + */ + public static boolean dumpStateStore(Configuration conf, + PrintStream output) throws IOException { + StateStoreService service = new StateStoreService(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false); + service.init(conf); + service.loadDriver(); + if (!service.isDriverReady()) { + System.err.println("Can't initialize driver"); + return false; + } + // Get the stores sorted by name + Map<String, RecordStore<? extends BaseRecord>> stores = new TreeMap<>(); + for(RecordStore<? extends BaseRecord> store: service.getRecordStores()) { + String recordName = StateStoreUtils.getRecordName(store.getRecordClass()); + stores.put(recordName, store); + } + for (Entry<String, RecordStore<? extends BaseRecord>> pair: stores.entrySet()) { + String recordName = pair.getKey(); + RecordStore<? extends BaseRecord> store = pair.getValue(); + output.println("---- " + recordName + " ----"); + if (store instanceof CachedRecordStore) { + for (Object record: ((CachedRecordStore) store).getCachedRecords()) { + if (record instanceof BaseRecord && record instanceof PBRecord) { + BaseRecord baseRecord = (BaseRecord) record; + // Generate the pseudo-json format of the protobuf record + String recordString = ((PBRecord) record).getProto().toString(); + // Indent each line + recordString = " " + recordString.replaceAll("\n", "\n "); + output.println(String.format(" %s:", baseRecord.getPrimaryKey())); + output.println(recordString); + } + } + output.println(); + } + } + service.stop(); + return true; + } + /** * Normalize a path for that filesystem. * @@ -1341,4 +1409,4 @@ public class RouterAdmin extends Configured implements Tool { return mode; } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md index 5a9c2fd4285..098c73a3b71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md @@ -328,6 +328,17 @@ To trigger a runtime-refresh of the resource specified by \<key\> on \<host:ipc\ [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -refreshRouterArgs <host:ipc_port> <key> [arg1..argn] +### Router state dump + +To diagnose the current state of the routers, you can use the +dumpState command. It generates a text dump of the records in the +State Store. Since it uses the configuration to find and read the +state store, it is often easiest to use the machine where the routers +run. The command runs locally, so the routers do not have to be up to +use this command. + + [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -dumpState + Client configuration -------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java index 677f3b5e947..761fad2fb7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,16 +42,20 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.impl.DisabledNameserviceStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MockStateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.tools.federation.RouterAdmin; import org.apache.hadoop.security.UserGroupInformation; @@ -852,6 +856,7 @@ public class TestRouterAdminCLI { + " <quota in bytes or quota size string>]\n" + "\t[-clrQuota <path>]\n" + "\t[-clrStorageTypeQuota <path>]\n" + + "\t[-dumpState]\n" + "\t[-safemode enter | leave | get]\n" + "\t[-nameservice enable | disable <nameservice>]\n" + "\t[-getDisabledNameservices]\n" @@ -1759,6 +1764,72 @@ public class TestRouterAdminCLI { assertTrue(err.toString().contains("No arguments allowed")); } + @Test + public void testDumpState() throws Exception { + MockStateStoreDriver driver = new MockStateStoreDriver(); + driver.clearAll(); + // Add two records for block1 + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha1", "cluster1", "block1", "rpc1", + "service1", "lifeline1", "https", "nn01", + FederationNamenodeServiceState.ACTIVE, false), false, false); + driver.put(MembershipState.newInstance("routerId", "ns1", + "ns1-ha2", "cluster1", "block1", "rpc2", + "service2", "lifeline2", "https", "nn02", + FederationNamenodeServiceState.STANDBY, false), false, false); + Configuration conf = new Configuration(); + conf.setClass(RBFConfigKeys.FEDERATION_STORE_DRIVER_CLASS, + MockStateStoreDriver.class, + StateStoreDriver.class); + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + try (PrintStream stream = new PrintStream(buffer)) { + RouterAdmin.dumpStateStore(conf, stream); + } + final String expected = + "---- DisabledNameservice ----\n" + + "\n" + + "---- MembershipState ----\n" + + " ns1-ha1-ns1-routerId:\n" + + " dateCreated: XXX\n" + + " dateModified: XXX\n" + + " routerId: \"routerId\"\n" + + " nameserviceId: \"ns1\"\n" + + " namenodeId: \"ns1-ha1\"\n" + + " clusterId: \"cluster1\"\n" + + " blockPoolId: \"block1\"\n" + + " webAddress: \"nn01\"\n" + + " rpcAddress: \"rpc1\"\n" + + " serviceAddress: \"service1\"\n" + + " lifelineAddress: \"lifeline1\"\n" + + " state: \"ACTIVE\"\n" + + " isSafeMode: false\n" + + " webScheme: \"https\"\n" + + " \n" + + " ns1-ha2-ns1-routerId:\n" + + " dateCreated: XXX\n" + + " dateModified: XXX\n" + + " routerId: \"routerId\"\n" + + " nameserviceId: \"ns1\"\n" + + " namenodeId: \"ns1-ha2\"\n" + + " clusterId: \"cluster1\"\n" + + " blockPoolId: \"block1\"\n" + + " webAddress: \"nn02\"\n" + + " rpcAddress: \"rpc2\"\n" + + " serviceAddress: \"service2\"\n" + + " lifelineAddress: \"lifeline2\"\n" + + " state: \"STANDBY\"\n" + + " isSafeMode: false\n" + + " webScheme: \"https\"\n" + + " \n" + + "\n" + + "---- MountTable ----\n" + + "\n" + + "---- RouterState ----"; + // Replace the time values with XXX + assertEquals(expected, + buffer.toString().trim().replaceAll("[0-9]{4,}+", "XXX")); + } + private void addMountTable(String src, String nsId, String dst) throws Exception { String[] argv = new String[] {"-add", src, nsId, dst}; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java index 57185a0a600..9f600cb6f3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/MockStateStoreDriver.java @@ -35,7 +35,7 @@ import java.util.Map; public class MockStateStoreDriver extends StateStoreBaseImpl { private boolean giveErrors = false; private boolean initialized = false; - private final Map<String, Map<String, BaseRecord>> valueMap = new HashMap<>(); + private static final Map<String, Map<String, BaseRecord>> VALUE_MAP = new HashMap<>(); @Override public boolean initDriver() { @@ -56,7 +56,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { @Override public void close() throws Exception { - valueMap.clear(); + VALUE_MAP.clear(); initialized = false; } @@ -82,7 +82,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { @SuppressWarnings("unchecked") public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException { checkErrors(); - Map<String, BaseRecord> map = valueMap.get(StateStoreUtils.getRecordName(clazz)); + Map<String, BaseRecord> map = VALUE_MAP.get(StateStoreUtils.getRecordName(clazz)); List<T> results = map != null ? new ArrayList<>((Collection<T>) map.values()) : new ArrayList<>(); return new QueryResult<>(results, System.currentTimeMillis()); @@ -96,7 +96,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { checkErrors(); for (T record : records) { Map<String, BaseRecord> map = - valueMap.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()), + VALUE_MAP.computeIfAbsent(StateStoreUtils.getRecordName(record.getClass()), k -> new HashMap<>()); String key = record.getPrimaryKey(); BaseRecord oldRecord = map.get(key); @@ -110,10 +110,17 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { return true; } + /** + * Clear all records from the store. + */ + public void clearAll() { + VALUE_MAP.clear(); + } + @Override public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException { checkErrors(); - return valueMap.remove(StateStoreUtils.getRecordName(clazz)) != null; + return VALUE_MAP.remove(StateStoreUtils.getRecordName(clazz)) != null; } @Override @@ -124,7 +131,7 @@ public class MockStateStoreDriver extends StateStoreBaseImpl { checkErrors(); int result = 0; Map<String, BaseRecord> map = - valueMap.get(StateStoreUtils.getRecordName(clazz)); + VALUE_MAP.get(StateStoreUtils.getRecordName(clazz)); if (map != null) { for (Iterator<BaseRecord> itr = map.values().iterator(); itr.hasNext();) { BaseRecord record = itr.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java index 4289999429b..8226178fe76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestRouterState.java @@ -101,6 +101,7 @@ public class TestRouterState { conf.setBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, false); service.init(conf); MockStateStoreDriver driver = (MockStateStoreDriver) service.getDriver(); + driver.clearAll(); // Add two records for block1 driver.put(MembershipState.newInstance("routerId", "ns1", "ns1-ha1", "cluster1", "block1", "rpc1", --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org