HDFS-13042. RBF: Heartbeat Router State. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7721fff7 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7721fff7 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7721fff7 Branch: refs/heads/YARN-1011 Commit: 7721fff74494eb7fbbbba7f8bb4b4692d880d301 Parents: eb2dd08 Author: Yiqun Lin <[email protected]> Authored: Thu Jan 25 15:51:26 2018 +0800 Committer: Yiqun Lin <[email protected]> Committed: Thu Jan 25 15:51:26 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 + .../hdfs/server/federation/router/Router.java | 71 +++++++ .../router/RouterHeartbeatService.java | 155 +++++++++++++++ .../federation/router/RouterServiceState.java | 2 +- .../federation/store/StateStoreService.java | 9 + .../src/main/resources/hdfs-default.xml | 20 ++ .../store/TestStateStoreRouterState.java | 194 +++++++++++++++++++ 7 files changed, 458 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f53badc..84215f3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1222,6 +1222,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE = FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable"; public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true; + public static final String DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS = + FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval"; + public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(5); // HDFS Router NN client public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = @@ -1282,6 +1286,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_STORE_PREFIX + "membership.expiration"; public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = TimeUnit.MINUTES.toMillis(5); + public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS = + FEDERATION_STORE_PREFIX + "router.expiration"; + public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(5); // HDFS Router-based federation mount table entries /** Maximum number of cache entries to have. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index ea8a1c0..1e72c93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -37,11 +37,13 @@ import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +114,18 @@ public class Router extends CompositeService { /** Quota cache manager. */ private RouterQuotaManager quotaManager; + /** Manages the current state of the router. */ + private RouterStore routerStateManager; + /** Heartbeat our run status to the router state manager. */ + private RouterHeartbeatService routerHeartbeatService; + + /** The start time of the namesystem. */ + private final long startTime = Time.now(); + + /** State of the Router. */ + private RouterServiceState state = RouterServiceState.UNINITIALIZED; + + ///////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////// @@ -127,6 +141,7 @@ public class Router extends CompositeService { @Override protected void serviceInit(Configuration configuration) throws Exception { this.conf = configuration; + updateRouterState(RouterServiceState.INITIALIZING); if (conf.getBoolean( DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, @@ -188,6 +203,10 @@ public class Router extends CompositeService { if (this.namenodeHearbeatServices.isEmpty()) { LOG.error("Heartbeat is enabled but there are no namenodes to monitor"); } + + // Periodically update the router state + this.routerHeartbeatService = new RouterHeartbeatService(this); + addService(this.routerHeartbeatService); } // Router metrics system @@ -219,6 +238,8 @@ public class Router extends CompositeService { @Override protected void serviceStart() throws Exception { + updateRouterState(RouterServiceState.RUNNING); + if (this.pauseMonitor != null) { this.pauseMonitor.start(); JvmMetrics jvmMetrics = this.metrics.getJvmMetrics(); @@ -233,6 +254,9 @@ public class Router extends CompositeService { @Override protected void serviceStop() throws Exception { + // Update state + updateRouterState(RouterServiceState.SHUTDOWN); + // JVM pause monitor if (this.pauseMonitor != null) { this.pauseMonitor.stop(); @@ -454,6 +478,31 @@ public class Router extends CompositeService { } ///////////////////////////////////////////////////////// + // Router State Management + ///////////////////////////////////////////////////////// + + /** + * Update the router state and heartbeat to the state store. + * + * @param state The new router state. + */ + public void updateRouterState(RouterServiceState newState) { + this.state = newState; + if (this.routerHeartbeatService != null) { + this.routerHeartbeatService.updateStateAsync(); + } + } + + /** + * Get the status of the router. + * + * @return Status of the router. + */ + public RouterServiceState getRouterState() { + return this.state; + } + + ///////////////////////////////////////////////////////// // Submodule getters ///////////////////////////////////////////////////////// @@ -508,11 +557,33 @@ public class Router extends CompositeService { return this.namenodeResolver; } + /** + * Get the state store interface for the router heartbeats. + * + * @return FederationRouterStateStore state store API handle. + */ + public RouterStore getRouterStateManager() { + if (this.routerStateManager == null && this.stateStore != null) { + this.routerStateManager = this.stateStore.getRegisteredRecordStore( + RouterStore.class); + } + return this.routerStateManager; + } + ///////////////////////////////////////////////////////// // Router info ///////////////////////////////////////////////////////// /** + * Get the start date of the Router. + * + * @return Start date of the router. + */ + public long getStartTime() { + return this.startTime; + } + + /** * Unique ID for the router, typically the hostname:port string for the * router's RPC server. This ID may be null on router startup before the RPC * server has bound to a port. http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java new file mode 100644 index 0000000..86a6210 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.store.CachedRecordStore; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.RecordStore; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.hdfs.server.federation.store.records.StateStoreVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service to periodically update the Router current state in the State Store. + */ +public class RouterHeartbeatService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterHeartbeatService.class); + + /** Router we are hearbeating. */ + private final Router router; + + /** + * Create a new Router heartbeat service. + * + * @param router Router to heartbeat. + */ + public RouterHeartbeatService(Router router) { + super(RouterHeartbeatService.class.getSimpleName()); + this.router = router; + } + + /** + * Trigger the update of the Router state asynchronously. + */ + protected void updateStateAsync() { + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + updateStateStore(); + } + }, "Router Heartbeat Async"); + thread.setDaemon(true); + thread.start(); + } + + /** + * Update the state of the Router in the State Store. + */ + private synchronized void updateStateStore() { + String routerId = router.getRouterId(); + if (routerId == null) { + LOG.error("Cannot heartbeat for router: unknown router id"); + return; + } + RouterStore routerStore = router.getRouterStateManager(); + if (routerStore != null) { + try { + RouterState record = RouterState.newInstance( + routerId, router.getStartTime(), router.getRouterState()); + StateStoreVersion stateStoreVersion = StateStoreVersion.newInstance( + getStateStoreVersion(MembershipStore.class), + getStateStoreVersion(MountTableStore.class)); + record.setStateStoreVersion(stateStoreVersion); + RouterHeartbeatRequest request = + RouterHeartbeatRequest.newInstance(record); + RouterHeartbeatResponse response = routerStore.routerHeartbeat(request); + if (!response.getStatus()) { + LOG.warn("Cannot heartbeat router {}", routerId); + } else { + LOG.debug("Router heartbeat for router {}", routerId); + } + } catch (IOException e) { + LOG.error("Cannot heartbeat router {}: {}", routerId, e.getMessage()); + } + } else { + LOG.warn("Cannot heartbeat router {}: State Store unavailable", routerId); + } + } + + /** + * Get the version of the data in the State Store. + * + * @param clazz Class in the State Store. + * @return Version of the data. + */ + private <R extends BaseRecord, S extends RecordStore<R>> + long getStateStoreVersion(final Class<S> clazz) { + long version = -1; + try { + StateStoreService stateStore = router.getStateStore(); + S recordStore = stateStore.getRegisteredRecordStore(clazz); + if (recordStore != null) { + if (recordStore instanceof CachedRecordStore) { + CachedRecordStore<R> cachedRecordStore = + (CachedRecordStore<R>) recordStore; + List<R> records = cachedRecordStore.getCachedRecords(); + for (BaseRecord record : records) { + if (record.getDateModified() > version) { + version = record.getDateModified(); + } + } + } + } + } catch (Exception e) { + LOG.error("Cannot get version for {}: {}", clazz, e.getMessage()); + } + return version; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + + long interval = conf.getTimeDuration( + DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS, + DFSConfigKeys.DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT, + TimeUnit.MILLISECONDS); + this.setIntervalMs(interval); + + super.serviceInit(conf); + } + + @Override + public void periodicInvoke() { + updateStateStore(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java index 25a6466..3accbe9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterServiceState.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hdfs.server.federation.router; * States of the Router. */ public enum RouterServiceState { - NONE, + UNINITIALIZED, INITIALIZING, SAFEMODE, RUNNING, http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 0289ba6..aa730ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -38,8 +39,10 @@ import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; +import org.apache.hadoop.hdfs.server.federation.store.impl.RouterStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; @@ -148,6 +151,7 @@ public class StateStoreService extends CompositeService { // Add supported record stores addRecordStore(MembershipStoreImpl.class); addRecordStore(MountTableStoreImpl.class); + addRecordStore(RouterStoreImpl.class); // Check the connection to the State Store periodically this.monitorService = new StateStoreConnectionMonitorService(this); @@ -158,6 +162,11 @@ public class StateStoreService extends CompositeService { DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, DFSConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT)); + RouterState.setExpirationMs(conf.getTimeDuration( + DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, + DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT, + TimeUnit.MILLISECONDS)); + // Cache update service this.cacheUpdater = new StateStoreCacheUpdateService(this); addService(this.cacheUpdater); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7a23eb4..d24310e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5111,6 +5111,26 @@ </property> <property> + <name>dfs.federation.router.heartbeat-state.interval</name> + <value>5s</value> + <description> + How often the Router should heartbeat its state into the State Store in + milliseconds. This setting supports multiple time unit suffixes as + described in dfs.federation.router.quota-cache.update.interval. + </description> + </property> + + <property> + <name>dfs.federation.router.store.router.expiration</name> + <value>5m</value> + <description> + Expiration time in milliseconds for a router state record. This setting + supports multiple time unit suffixes as described in + dfs.federation.router.quota-cache.update.interval. + </description> + </property> + + <property> <name>dfs.federation.router.monitor.namenode</name> <value></value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/7721fff7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java new file mode 100644 index 0000000..ae15ef6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyException; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearRecords; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; +import org.apache.hadoop.util.Time; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test the basic {@link StateStoreService} {@link RouterStore} functionality. + */ +public class TestStateStoreRouterState extends TestStateStoreBase { + + private static RouterStore routerStore; + + @BeforeClass + public static void create() { + // Reduce expirations to 5 seconds + getConf().setTimeDuration( + DFSConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS, + 5, TimeUnit.SECONDS); + } + + @Before + public void setup() throws IOException, InterruptedException { + + if (routerStore == null) { + routerStore = + getStateStore().getRegisteredRecordStore(RouterStore.class); + } + + // Clear router status registrations + assertTrue(clearRecords(getStateStore(), RouterState.class)); + } + + @Test + public void testStateStoreDisconnected() throws Exception { + + // Close the data store driver + getStateStore().closeDriver(); + assertEquals(false, getStateStore().isDriverReady()); + + // Test all APIs that access the data store to ensure they throw the correct + // exception. + GetRouterRegistrationRequest getSingleRequest = + GetRouterRegistrationRequest.newInstance(); + verifyException(routerStore, "getRouterRegistration", + StateStoreUnavailableException.class, + new Class[] {GetRouterRegistrationRequest.class}, + new Object[] {getSingleRequest}); + + GetRouterRegistrationsRequest getRequest = + GetRouterRegistrationsRequest.newInstance(); + routerStore.loadCache(true); + verifyException(routerStore, "getRouterRegistrations", + StateStoreUnavailableException.class, + new Class[] {GetRouterRegistrationsRequest.class}, + new Object[] {getRequest}); + + RouterHeartbeatRequest hbRequest = RouterHeartbeatRequest.newInstance( + RouterState.newInstance("test", 0, RouterServiceState.UNINITIALIZED)); + verifyException(routerStore, "routerHeartbeat", + StateStoreUnavailableException.class, + new Class[] {RouterHeartbeatRequest.class}, + new Object[] {hbRequest}); + } + + // + // Router + // + @Test + public void testUpdateRouterStatus() + throws IllegalStateException, IOException { + + long dateStarted = Time.now(); + String address = "testaddress"; + + // Set + RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + address, dateStarted, RouterServiceState.RUNNING)); + assertTrue(routerStore.routerHeartbeat(request).getStatus()); + + // Verify + GetRouterRegistrationRequest getRequest = + GetRouterRegistrationRequest.newInstance(address); + RouterState record = + routerStore.getRouterRegistration(getRequest).getRouter(); + assertNotNull(record); + assertEquals(RouterServiceState.RUNNING, record.getStatus()); + assertEquals(address, record.getAddress()); + assertEquals(FederationUtil.getCompileInfo(), record.getCompileInfo()); + // Build version may vary a bit + assertTrue(record.getBuildVersion().length() > 0); + } + + @Test + public void testRouterStateExpired() + throws IOException, InterruptedException { + + long dateStarted = Time.now(); + String address = "testaddress"; + + RouterHeartbeatRequest request = RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + address, dateStarted, RouterServiceState.RUNNING)); + // Set + assertTrue(routerStore.routerHeartbeat(request).getStatus()); + + // Verify + GetRouterRegistrationRequest getRequest = + GetRouterRegistrationRequest.newInstance(address); + RouterState record = + routerStore.getRouterRegistration(getRequest).getRouter(); + assertNotNull(record); + + // Wait past expiration (set to 5 sec in config) + Thread.sleep(6000); + + // Verify expired + RouterState r = routerStore.getRouterRegistration(getRequest).getRouter(); + assertEquals(RouterServiceState.EXPIRED, r.getStatus()); + + // Heartbeat again and this shouldn't be EXPIRED anymore + assertTrue(routerStore.routerHeartbeat(request).getStatus()); + r = routerStore.getRouterRegistration(getRequest).getRouter(); + assertEquals(RouterServiceState.RUNNING, r.getStatus()); + } + + @Test + public void testGetAllRouterStates() + throws StateStoreUnavailableException, IOException { + + // Set 2 entries + RouterHeartbeatRequest heartbeatRequest1 = + RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + "testaddress1", Time.now(), RouterServiceState.RUNNING)); + assertTrue(routerStore.routerHeartbeat(heartbeatRequest1).getStatus()); + + RouterHeartbeatRequest heartbeatRequest2 = + RouterHeartbeatRequest.newInstance( + RouterState.newInstance( + "testaddress2", Time.now(), RouterServiceState.RUNNING)); + assertTrue(routerStore.routerHeartbeat(heartbeatRequest2).getStatus()); + + // Verify + routerStore.loadCache(true); + GetRouterRegistrationsRequest request = + GetRouterRegistrationsRequest.newInstance(); + List<RouterState> entries = + routerStore.getRouterRegistrations(request).getRouters(); + assertEquals(2, entries.size()); + Collections.sort(entries); + assertEquals("testaddress1", entries.get(0).getAddress()); + assertEquals("testaddress2", entries.get(1).getAddress()); + assertEquals(RouterServiceState.RUNNING, entries.get(0).getStatus()); + assertEquals(RouterServiceState.RUNNING, entries.get(1).getStatus()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
