This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new c68b029 Properly clean up TaskCurrentState sessions (#1736)
c68b029 is described below
commit c68b029e011d1181b2095c6f249869c678fa3123
Author: Neal Sun <[email protected]>
AuthorDate: Tue May 18 11:23:41 2021 -0700
Properly clean up TaskCurrentState sessions (#1736)
This PR fixes TaskCurrentState session clean up to be independent from
regular sessions.
Co-authored-by: Neal Sun <[email protected]>
---
.../main/java/org/apache/helix/PropertyKey.java | 10 +++++
.../helix/manager/zk/ParticipantManager.java | 17 ++++++++-
.../java/org/apache/helix/common/ZkTestBase.java | 9 ++++-
.../apache/helix/manager/zk/TestHandleSession.java | 43 ++++++++++++++++++++++
4 files changed, 76 insertions(+), 3 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 0427068..2cf1168 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -487,6 +487,16 @@ public class PropertyKey {
}
/**
+ * Get a property key associated with {@link CurrentState} of an instance.
This key is for
+ * TaskCurrentState specifically.
+ * @param instanceName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey taskCurrentStateSessions(String instanceName) {
+ return new PropertyKey(TASKCURRENTSTATES, CurrentState.class,
_clusterName, instanceName);
+ }
+
+ /**
* Get a property key associated with {@link CurrentState} of an instance
and session. This key
* is for TaskCurrentState specifically.
* @param instanceName
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 3a58570..6e071f6 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -154,6 +154,7 @@ public class ParticipantManager {
// should not be created by an expired zk session.
createLiveInstance();
carryOverPreviousCurrentState();
+ removePreviousTaskCurrentStates();
/**
* setup message listener
@@ -362,6 +363,7 @@ public class ParticipantManager {
}
// If the the current state is related to tasks, there is no need to
carry it over to new session.
+ // Note: this check is not necessary due to TaskCurrentStates, but
keep it for backwards compatibility
if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
continue;
}
@@ -418,7 +420,20 @@ public class ParticipantManager {
String path = _keyBuilder.currentStates(_instanceName,
session).getPath();
LOG.info("Removing current states from previous sessions. path: " +
path);
_zkclient.deleteRecursively(path);
- path = _keyBuilder.taskCurrentStates(_instanceName, session).getPath();
+ }
+ }
+
+ /**
+ * Remove all previous task current state sessions
+ */
+ private void removePreviousTaskCurrentStates() {
+ for (String session : _dataAccessor
+ .getChildNames(_keyBuilder.taskCurrentStateSessions(_instanceName))) {
+ if (session.equals(_sessionId)) {
+ continue;
+ }
+
+ String path = _keyBuilder.taskCurrentStates(_instanceName,
session).getPath();
LOG.info("Removing task current states from previous sessions. path: " +
path);
_zkclient.deleteRecursively(path);
}
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index e832307..b1aadf6 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -929,8 +929,13 @@ public class ZkTestBase {
LOG.error("Current state not empty for " + participant);
return false;
}
- CurrentState taskCurrentState =
- accessor.getProperty(keyBuilder.taskCurrentState(participant,
sessionId, _resourceName));
+ }
+
+ List<String> taskSessionIds =
+
accessor.getChildNames(keyBuilder.taskCurrentStateSessions(participant));
+ for (String sessionId : taskSessionIds) {
+ CurrentState taskCurrentState = accessor
+ .getProperty(keyBuilder.taskCurrentState(participant, sessionId,
_resourceName));
Map<String, String> taskPartitionStateMap =
taskCurrentState.getPartitionStateMap();
if (taskPartitionStateMap != null &&
!taskPartitionStateMap.isEmpty()) {
LOG.error("Task current state not empty for " + participant);
diff --git
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
index 7d55a69..ee2f215 100644
---
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
+++
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
@@ -42,6 +43,7 @@ import
org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -187,6 +189,47 @@ public class TestHandleSession extends ZkTestBase {
TestHelper.dropCluster(clusterName, _gZkClient);
}
+ @Test (dependsOnMethods = "testAcquireLeadershipOnNewSession")
+ public void testRemoveOldSession() throws Exception {
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = _className + "_" + methodName;
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ 10, // partitions per resource
+ 5, // number of nodes
+ 3, // replicas
+ "MasterSlave", true); // do rebalance
+
+ String testInstanceName = "localhost_12918";
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, clusterName, testInstanceName);
+ participant.syncStart();
+
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+ String testCurrentStateSessionId = "testCurrentStateSessionId";
+ _baseAccessor
+ .create(keyBuilder.sessions(testInstanceName).toString() + "/" +
testCurrentStateSessionId,
+ new ZNRecord(testCurrentStateSessionId), AccessOption.PERSISTENT);
+ String testTaskCurrentStateSessionId = "testTaskCurrentStateSessionId";
+
_baseAccessor.create(keyBuilder.taskCurrentStateSessions(testInstanceName).toString()
+ "/"
+ + testTaskCurrentStateSessionId, new
ZNRecord(testTaskCurrentStateSessionId),
+ AccessOption.PERSISTENT);
+
+ ZkTestHelper.expireSession(participant.getZkClient());
+ // Ensure that the test sessions are removed
+
Assert.assertEquals(_gZkClient.getChildren(keyBuilder.sessions(testInstanceName).toString()),
+ Collections.emptyList());
+ Assert.assertEquals(
+
_gZkClient.getChildren(keyBuilder.taskCurrentStateSessions(testInstanceName).toString()),
+ Collections.emptyList());
+
+ participant.syncStop();
+ deleteCluster(clusterName);
+ }
+
/*
* Tests session expiry before calling
ZkHelixManager.handleNewSession(sessionId).
* This test checks to see if the expired sessions would be discarded and
the operation would