This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64d4abf HDFS-14593. RBF: Implement deletion feature for expired
records in State Store. Contributed by Takanobu Asanuma.
64d4abf is described below
commit 64d4abf489a0267a265591026f8e6c84bc78591e
Author: Ayush Saxena <[email protected]>
AuthorDate: Mon Jul 15 22:38:00 2019 +0530
HDFS-14593. RBF: Implement deletion feature for expired records in State
Store. Contributed by Takanobu Asanuma.
---
.../server/federation/router/RBFConfigKeys.java | 8 ++
.../server/federation/store/CachedRecordStore.java | 22 ++++-
.../server/federation/store/StateStoreService.java | 16 +++-
.../federation/store/records/BaseRecord.java | 46 +++++++++-
.../federation/store/records/MembershipState.java | 21 +++++
.../federation/store/records/QueryResult.java | 3 +-
.../federation/store/records/RouterState.java | 17 ++++
.../records/impl/pb/MembershipStatePBImpl.java | 4 +-
.../store/records/impl/pb/RouterStatePBImpl.java | 4 +-
.../src/main/resources/hdfs-rbf-default.xml | 20 +++++
.../store/TestStateStoreMembershipState.java | 100 ++++++++++++++++++---
.../store/TestStateStoreRouterState.java | 61 ++++++++++---
.../store/driver/TestStateStoreDriverBase.java | 5 +-
13 files changed, 289 insertions(+), 38 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 1daebdc..a2bec12 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -201,10 +201,18 @@ public class RBFConfigKeys extends
CommonConfigurationKeysPublic {
FEDERATION_STORE_PREFIX + "membership.expiration";
public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
+ public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS
+ = FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS + ".deletion";
+ public static final long
+ FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS_DEFAULT = -1;
public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS =
FEDERATION_STORE_PREFIX + "router.expiration";
public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
+ public static final String FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS =
+ FEDERATION_STORE_ROUTER_EXPIRATION_MS + ".deletion";
+ public static final long
+ FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT = -1;
// HDFS Router safe mode
public static final String DFS_ROUTER_SAFEMODE_ENABLE =
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
index 5cfb521..7b28c03 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.federation.store;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
@@ -164,13 +163,15 @@ public abstract class CachedRecordStore<R extends
BaseRecord>
/**
* Updates the state store with any record overrides we detected, such as an
- * expired state.
+ * expired state. If an expired record exists beyond deletion time, it is
+ * removed.
*
* @param query RecordQueryResult containing the data to be inspected.
* @throws IOException If the values cannot be updated.
*/
public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
List<R> commitRecords = new ArrayList<>();
+ List<R> deleteRecords = new ArrayList<>();
List<R> newRecords = query.getRecords();
long currentDriverTime = query.getTimestamp();
if (newRecords == null || currentDriverTime <= 0) {
@@ -178,7 +179,16 @@ public abstract class CachedRecordStore<R extends
BaseRecord>
return;
}
for (R record : newRecords) {
- if (record.checkExpired(currentDriverTime)) {
+ if (record.shouldBeDeleted(currentDriverTime)) {
+ String recordName = StateStoreUtils.getRecordName(record.getClass());
+ if (getDriver().remove(record)) {
+ deleteRecords.add(record);
+ LOG.info("Deleted State Store record {}: {}", recordName, record);
+ } else {
+ LOG.warn("Couldn't delete State Store record {}: {}", recordName,
+ record);
+ }
+ } else if (record.checkExpired(currentDriverTime)) {
String recordName = StateStoreUtils.getRecordName(record.getClass());
LOG.info("Override State Store record {}: {}", recordName, record);
commitRecords.add(record);
@@ -187,6 +197,9 @@ public abstract class CachedRecordStore<R extends
BaseRecord>
if (commitRecords.size() > 0) {
getDriver().putAll(commitRecords, true, false);
}
+ if (deleteRecords.size() > 0) {
+ newRecords.removeAll(deleteRecords);
+ }
}
/**
@@ -197,7 +210,8 @@ public abstract class CachedRecordStore<R extends
BaseRecord>
* @throws IOException If the values cannot be updated.
*/
public void overrideExpiredRecord(R record) throws IOException {
- List<R> newRecords = Collections.singletonList(record);
+ List<R> newRecords = new ArrayList<>();
+ newRecords.add(record);
long time = getDriver().getTime();
QueryResult<R> query = new QueryResult<>(newRecords, time);
overrideExpiredRecords(query);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
index 37b62fb..66c2882 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java
@@ -160,15 +160,27 @@ public class StateStoreService extends CompositeService {
this.addService(monitorService);
// Set expirations intervals for each record
- MembershipState.setExpirationMs(conf.getLong(
+ MembershipState.setExpirationMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
- RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT));
+ RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
+
+ MembershipState.setDeletionMs(conf.getTimeDuration(
+ RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS,
+ RBFConfigKeys
+ .FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
RouterState.setExpirationMs(conf.getTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT,
TimeUnit.MILLISECONDS));
+ RouterState.setDeletionMs(conf.getTimeDuration(
+ RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS,
+ RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS_DEFAULT,
+ TimeUnit.MILLISECONDS));
+
// Cache update service
this.cacheUpdater = new StateStoreCacheUpdateService(this);
addService(this.cacheUpdater);
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
index 7212f3a..86721ea 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/BaseRecord.java
@@ -75,6 +75,26 @@ public abstract class BaseRecord implements
Comparable<BaseRecord> {
public abstract long getExpirationMs();
/**
+ * Check if this record is expired. The default is false. Override for
+ * customized behavior.
+ *
+ * @return True if the record is expired.
+ */
+ public boolean isExpired() {
+ return false;
+ }
+
+ /**
+ * Get the deletion time for the expired record. The default is disabled.
+ * Override for customized behavior.
+ *
+ * @return Deletion time for the expired record.
+ */
+ public long getDeletionMs() {
+ return -1;
+ }
+
+ /**
* Map of primary key names to values for the record. The primary key can be
* a combination of 1-n different State Store serialized values.
*
@@ -202,13 +222,35 @@ public abstract class BaseRecord implements
Comparable<BaseRecord> {
*/
public boolean checkExpired(long currentTime) {
long expiration = getExpirationMs();
- if (getDateModified() > 0 && expiration > 0) {
- return (getDateModified() + expiration) < currentTime;
+ long modifiedTime = getDateModified();
+ if (modifiedTime > 0 && expiration > 0) {
+ return (modifiedTime + expiration) < currentTime;
}
return false;
}
/**
+ * Called when this record is expired and expired deletion is enabled, checks
+ * for the deletion. If an expired record exists beyond the deletion time, it
+ * should be deleted.
+ *
+ * @param currentTime The current timestamp in ms from the data store, to be
+ * compared against the modification and creation dates of the
+ * object.
+ * @return boolean True if the record has been updated and should be
+ * deleted from the data store.
+ */
+ public boolean shouldBeDeleted(long currentTime) {
+ long deletionTime = getDeletionMs();
+ if (isExpired() && deletionTime > 0) {
+ long elapsedTime = currentTime - (getDateModified() + getExpirationMs());
+ return elapsedTime > deletionTime;
+ } else {
+ return false;
+ }
+ }
+
+ /**
* Validates the record. Called when the record is created, populated from
the
* state store, and before committing to the state store. If validate failed,
* there throws an exception.
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
index 642c72b..8b9f713 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java
@@ -48,6 +48,8 @@ public abstract class MembershipState extends BaseRecord
/** Expiration time in ms for this entry. */
private static long expirationMs;
+ /** Deletion time in ms for this expired entry. */
+ private static long deletionMs;
/** Comparator based on the name.*/
public static final Comparator<MembershipState> NAME_COMPARATOR =
@@ -330,4 +332,23 @@ public abstract class MembershipState extends BaseRecord
public static void setExpirationMs(long time) {
MembershipState.expirationMs = time;
}
+
+ @Override
+ public boolean isExpired() {
+ return getState() == EXPIRED;
+ }
+
+ @Override
+ public long getDeletionMs() {
+ return MembershipState.deletionMs;
+ }
+
+ /**
+ * Set the deletion time for this class.
+ *
+ * @param time Deletion time in milliseconds.
+ */
+ public static void setDeletionMs(long time) {
+ MembershipState.deletionMs = time;
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java
index 64c2c71..a5cda4b 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/QueryResult.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.federation.store.records;
-import java.util.Collections;
import java.util.List;
/**
@@ -42,7 +41,7 @@ public class QueryResult<T extends BaseRecord> {
* @return List of records.
*/
public List<T> getRecords() {
- return Collections.unmodifiableList(this.records);
+ return this.records;
}
/**
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
index 2fe6941..761e2a4 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/RouterState.java
@@ -40,6 +40,9 @@ public abstract class RouterState extends BaseRecord {
/** Expiration time in ms for this entry. */
private static long expirationMs;
+ /** Deletion time in ms for this entry when it is expired. */
+ private static long deletionMs;
+
/**
* Constructors.
*/
@@ -169,4 +172,18 @@ public abstract class RouterState extends BaseRecord {
public static void setExpirationMs(long time) {
RouterState.expirationMs = time;
}
+
+ @Override
+ public boolean isExpired() {
+ return getStatus() == RouterServiceState.EXPIRED;
+ }
+
+ @Override
+ public long getDeletionMs() {
+ return RouterState.deletionMs;
+ }
+
+ public static void setDeletionMs(long time) {
+ RouterState.deletionMs = time;
+ }
}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
index 614957b..4e2868a 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java
@@ -315,7 +315,9 @@ public class MembershipStatePBImpl extends MembershipState
implements PBRecord {
@Override
public void setDateModified(long time) {
- this.translator.getBuilder().setDateModified(time);
+ if (getState() != FederationNamenodeServiceState.EXPIRED) {
+ this.translator.getBuilder().setDateModified(time);
+ }
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
index d837386..107996c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/RouterStatePBImpl.java
@@ -182,7 +182,9 @@ public class RouterStatePBImpl extends RouterState
implements PBRecord {
@Override
public void setDateModified(long time) {
- this.translator.getBuilder().setDateModified(time);
+ if (getStatus() != RouterServiceState.EXPIRED) {
+ this.translator.getBuilder().setDateModified(time);
+ }
}
@Override
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 3f743f9..fba0869 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -346,6 +346,16 @@
</property>
<property>
+ <name>dfs.federation.router.store.membership.expiration.deletion</name>
+ <value>-1</value>
+ <description>
+ Deletion time in milliseconds for a membership record. If an expired
+ membership record exists beyond this time, it will be deleted. If this
+ value is negative, the deletion is disabled.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.heartbeat.enable</name>
<value>true</value>
<description>
@@ -392,6 +402,16 @@
</property>
<property>
+ <name>dfs.federation.router.store.router.expiration.deletion</name>
+ <value>-1</value>
+ <description>
+ Deletion time in milliseconds for a router state record. If an expired
+ router state record exists beyond this time, it will be deleted. If this
+ value is negative, the deletion is disabled.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.safemode.enable</name>
<value>true</value>
<description>
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
index dd349da..f1f15c6 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import
org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
@@ -43,6 +44,7 @@ import
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeat
import
org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatResponse;
import
org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateNamenodeRegistrationRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -57,10 +59,14 @@ public class TestStateStoreMembershipState extends
TestStateStoreBase {
@BeforeClass
public static void create() {
- // Reduce expirations to 5 seconds
+ // Reduce expirations to 2 seconds
getConf().setLong(
RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS,
- TimeUnit.SECONDS.toMillis(5));
+ TimeUnit.SECONDS.toMillis(2));
+ // Set deletion time to 2 seconds
+ getConf().setLong(
+ RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_DELETION_MS,
+ TimeUnit.SECONDS.toMillis(2));
}
@Before
@@ -363,8 +369,8 @@ public class TestStateStoreMembershipState extends
TestStateStoreBase {
}
@Test
- public void testRegistrationExpired()
- throws InterruptedException, IOException {
+ public void testRegistrationExpiredAndDeletion()
+ throws InterruptedException, IOException, TimeoutException {
// Populate the state store with a single NN element
// 1) ns0:nn0 - Active
@@ -385,20 +391,32 @@ public class TestStateStoreMembershipState extends
TestStateStoreBase {
assertNotNull(quorumEntry);
assertEquals(ROUTERS[0], quorumEntry.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE,
quorumEntry.getState());
-
- // Wait past expiration (set in conf to 5 seconds)
- Thread.sleep(6000);
- // Reload cache
- assertTrue(getStateStore().loadCache(MembershipStore.class, true));
-
- // Verify entry is now expired and is no longer in the cache
- quorumEntry = getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]);
+ quorumEntry = getExpiredNamenodeRegistration(report.getNameserviceId(),
+ report.getNamenodeId());
assertNull(quorumEntry);
+ // Wait past expiration (set in conf to 2 seconds)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+ // Verify entry is expired and is no longer in the cache
+ return getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) == null;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 3000);
+
+ // Verify entry is in expired membership records
+ quorumEntry = getExpiredNamenodeRegistration(NAMESERVICES[0],
NAMENODES[0]);
+ assertNotNull(quorumEntry);
+
// Verify entry is now expired and can't be used by RPC service
quorumEntry = getNamenodeRegistration(
report.getNameserviceId(), report.getNamenodeId());
assertNull(quorumEntry);
+ quorumEntry = getExpiredNamenodeRegistration(
+ report.getNameserviceId(), report.getNamenodeId());
+ assertNotNull(quorumEntry);
// Heartbeat again, updates dateModified
assertTrue(namenodeHeartbeat(report));
@@ -411,6 +429,36 @@ public class TestStateStoreMembershipState extends
TestStateStoreBase {
assertNotNull(quorumEntry);
assertEquals(ROUTERS[0], quorumEntry.getRouterId());
assertEquals(FederationNamenodeServiceState.ACTIVE,
quorumEntry.getState());
+ quorumEntry = getExpiredNamenodeRegistration(
+ report.getNameserviceId(), report.getNamenodeId());
+ assertNull(quorumEntry);
+
+ // Wait past expiration (set in conf to 2 seconds)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+ // Verify entry is expired and is no longer in the cache
+ return getNamenodeRegistration(NAMESERVICES[0], NAMENODES[0]) == null;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 3000);
+
+ // Verify entry is in expired membership records
+ quorumEntry = getExpiredNamenodeRegistration(NAMESERVICES[0],
NAMENODES[0]);
+ assertNotNull(quorumEntry);
+
+ // Wait past deletion (set in conf to 2 seconds)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ assertTrue(getStateStore().loadCache(MembershipStore.class, true));
+ // Verify entry is deleted from even the expired membership records
+ return getExpiredNamenodeRegistration(NAMESERVICES[0], NAMENODES[0])
+ == null;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 3000);
}
/**
@@ -442,6 +490,34 @@ public class TestStateStoreMembershipState extends
TestStateStoreBase {
}
/**
+ * Get a single expired namenode membership record from the store.
+ *
+ * @param nsId The HDFS nameservice ID to search for
+ * @param nnId The HDFS namenode ID to search for
+ * @return The single expired NamenodeMembershipRecord that matches the query
+ * or null if not found.
+ * @throws IOException if the query could not be executed.
+ */
+ private MembershipState getExpiredNamenodeRegistration(
+ final String nsId, final String nnId) throws IOException {
+
+ MembershipState partial = MembershipState.newInstance();
+ partial.setNameserviceId(nsId);
+ partial.setNamenodeId(nnId);
+ GetNamenodeRegistrationsRequest request =
+ GetNamenodeRegistrationsRequest.newInstance(partial);
+ GetNamenodeRegistrationsResponse response =
+ membershipStore.getExpiredNamenodeRegistrations(request);
+
+ List<MembershipState> results = response.getNamenodeMemberships();
+ if (results != null && results.size() == 1) {
+ MembershipState record = results.get(0);
+ return record;
+ }
+ return null;
+ }
+
+ /**
* Register a namenode heartbeat with the state store.
*
* @param store FederationMembershipStateStore instance to retrieve the
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
index db1df19..9b459f3 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreRouterState.java
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
@@ -36,6 +37,7 @@ import
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistra
import
org.apache.hadoop.hdfs.server.federation.store.protocol.GetRouterRegistrationsRequest;
import
org.apache.hadoop.hdfs.server.federation.store.protocol.RouterHeartbeatRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -50,10 +52,14 @@ public class TestStateStoreRouterState extends
TestStateStoreBase {
@BeforeClass
public static void create() {
- // Reduce expirations to 5 seconds
+ // Reduce expirations to 2 seconds
getConf().setTimeDuration(
RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_MS,
- 5, TimeUnit.SECONDS);
+ 2, TimeUnit.SECONDS);
+ // Set deletion time to 2 seconds
+ getConf().setTimeDuration(
+ RBFConfigKeys.FEDERATION_STORE_ROUTER_EXPIRATION_DELETION_MS,
+ 2, TimeUnit.SECONDS);
}
@Before
@@ -130,8 +136,8 @@ public class TestStateStoreRouterState extends
TestStateStoreBase {
}
@Test
- public void testRouterStateExpired()
- throws IOException, InterruptedException {
+ public void testRouterStateExpiredAndDeletion()
+ throws IOException, InterruptedException, TimeoutException {
long dateStarted = Time.now();
String address = "testaddress";
@@ -149,17 +155,46 @@ public class TestStateStoreRouterState extends
TestStateStoreBase {
routerStore.getRouterRegistration(getRequest).getRouter();
assertNotNull(record);
- // Wait past expiration (set to 5 sec in config)
- Thread.sleep(6000);
-
- // Verify expired
- RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
- assertEquals(RouterServiceState.EXPIRED, r.getStatus());
-
- // Heartbeat again and this shouldn't be EXPIRED anymore
+ // Wait past expiration (set in conf to 2 seconds)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ RouterState routerState = routerStore
+ .getRouterRegistration(getRequest).getRouter();
+ // Verify entry is expired
+ return routerState.getStatus() == RouterServiceState.EXPIRED;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 3000);
+
+ // Heartbeat again and this shouldn't be EXPIRED at this point
assertTrue(routerStore.routerHeartbeat(request).getStatus());
- r = routerStore.getRouterRegistration(getRequest).getRouter();
+ RouterState r = routerStore.getRouterRegistration(getRequest).getRouter();
assertEquals(RouterServiceState.RUNNING, r.getStatus());
+
+ // Wait past expiration (set in conf to 2 seconds)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ RouterState routerState = routerStore
+ .getRouterRegistration(getRequest).getRouter();
+ // Verify entry is expired
+ return routerState.getStatus() == RouterServiceState.EXPIRED;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 3000);
+
+ // Wait deletion (set in conf to 2 seconds)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ RouterState routerState = routerStore
+ .getRouterRegistration(getRequest).getRouter();
+ // Verify entry is deleted
+ return routerState.getStatus() == null;
+ } catch (IOException e) {
+ return false;
+ }
+ }, 100, 3000);
}
@Test
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
index d6c829b..b3a9fb5 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java
@@ -184,7 +184,10 @@ public class TestStateStoreDriverBase {
long now = stateStore.getDriver().getTime();
assertTrue(
committed.getDateCreated() <= now && committed.getDateCreated() > 0);
- assertTrue(committed.getDateModified() >= committed.getDateCreated());
+ // since expired record doesn't update the modification time, let's skip it
+ if (!committed.isExpired()) {
+ assertTrue(committed.getDateModified() >= committed.getDateCreated());
+ }
return ret;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]