This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new c68b029  Properly clean up TaskCurrentState sessions (#1736)
c68b029 is described below

commit c68b029e011d1181b2095c6f249869c678fa3123
Author: Neal Sun <[email protected]>
AuthorDate: Tue May 18 11:23:41 2021 -0700

    Properly clean up TaskCurrentState sessions (#1736)
    
    This PR fixes TaskCurrentState session clean up to be independent from 
regular sessions.
    
    Co-authored-by: Neal Sun <[email protected]>
---
 .../main/java/org/apache/helix/PropertyKey.java    | 10 +++++
 .../helix/manager/zk/ParticipantManager.java       | 17 ++++++++-
 .../java/org/apache/helix/common/ZkTestBase.java   |  9 ++++-
 .../apache/helix/manager/zk/TestHandleSession.java | 43 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java 
b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 0427068..2cf1168 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -487,6 +487,16 @@ public class PropertyKey {
     }
 
     /**
+     * Get a property key associated with {@link CurrentState} of an instance. 
This key is for
+     * TaskCurrentState specifically.
+     * @param instanceName
+     * @return {@link PropertyKey}
+     */
+    public PropertyKey taskCurrentStateSessions(String instanceName) {
+      return new PropertyKey(TASKCURRENTSTATES, CurrentState.class, 
_clusterName, instanceName);
+    }
+
+    /**
      * Get a property key associated with {@link CurrentState} of an instance 
and session. This key
      * is for TaskCurrentState specifically.
      * @param instanceName
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 3a58570..6e071f6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -154,6 +154,7 @@ public class ParticipantManager {
     // should not be created by an expired zk session.
     createLiveInstance();
     carryOverPreviousCurrentState();
+    removePreviousTaskCurrentStates();
 
     /**
      * setup message listener
@@ -362,6 +363,7 @@ public class ParticipantManager {
         }
 
         // If the the current state is related to tasks, there is no need to 
carry it over to new session.
+        // Note: this check is not necessary due to TaskCurrentStates, but 
keep it for backwards compatibility
         if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
           continue;
         }
@@ -418,7 +420,20 @@ public class ParticipantManager {
       String path = _keyBuilder.currentStates(_instanceName, 
session).getPath();
       LOG.info("Removing current states from previous sessions. path: " + 
path);
       _zkclient.deleteRecursively(path);
-      path = _keyBuilder.taskCurrentStates(_instanceName, session).getPath();
+    }
+  }
+
+  /**
+   * Remove all previous task current state sessions
+   */
+  private void removePreviousTaskCurrentStates() {
+    for (String session : _dataAccessor
+        .getChildNames(_keyBuilder.taskCurrentStateSessions(_instanceName))) {
+      if (session.equals(_sessionId)) {
+        continue;
+      }
+
+      String path = _keyBuilder.taskCurrentStates(_instanceName, 
session).getPath();
       LOG.info("Removing task current states from previous sessions. path: " + 
path);
       _zkclient.deleteRecursively(path);
     }
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index e832307..b1aadf6 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -929,8 +929,13 @@ public class ZkTestBase {
             LOG.error("Current state not empty for " + participant);
             return false;
           }
-          CurrentState taskCurrentState =
-              accessor.getProperty(keyBuilder.taskCurrentState(participant, 
sessionId, _resourceName));
+        }
+
+        List<String> taskSessionIds =
+            
accessor.getChildNames(keyBuilder.taskCurrentStateSessions(participant));
+        for (String sessionId : taskSessionIds) {
+          CurrentState taskCurrentState = accessor
+              .getProperty(keyBuilder.taskCurrentState(participant, sessionId, 
_resourceName));
           Map<String, String> taskPartitionStateMap = 
taskCurrentState.getPartitionStateMap();
           if (taskPartitionStateMap != null && 
!taskPartitionStateMap.isEmpty()) {
             LOG.error("Task current state not empty for " + participant);
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
index 7d55a69..ee2f215 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
@@ -42,6 +43,7 @@ import 
org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -187,6 +189,47 @@ public class TestHandleSession extends ZkTestBase {
     TestHelper.dropCluster(clusterName, _gZkClient);
   }
 
+  @Test (dependsOnMethods = "testAcquireLeadershipOnNewSession")
+  public void testRemoveOldSession() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = _className + "_" + methodName;
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        10, // partitions per resource
+        5, // number of nodes
+        3, // replicas
+        "MasterSlave", true); // do rebalance
+
+    String testInstanceName = "localhost_12918";
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, testInstanceName);
+    participant.syncStart();
+
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+    String testCurrentStateSessionId = "testCurrentStateSessionId";
+    _baseAccessor
+        .create(keyBuilder.sessions(testInstanceName).toString() + "/" + 
testCurrentStateSessionId,
+            new ZNRecord(testCurrentStateSessionId), AccessOption.PERSISTENT);
+    String testTaskCurrentStateSessionId = "testTaskCurrentStateSessionId";
+    
_baseAccessor.create(keyBuilder.taskCurrentStateSessions(testInstanceName).toString()
 + "/"
+            + testTaskCurrentStateSessionId, new 
ZNRecord(testTaskCurrentStateSessionId),
+        AccessOption.PERSISTENT);
+
+    ZkTestHelper.expireSession(participant.getZkClient());
+    // Ensure that the test sessions are removed
+    
Assert.assertEquals(_gZkClient.getChildren(keyBuilder.sessions(testInstanceName).toString()),
+        Collections.emptyList());
+    Assert.assertEquals(
+        
_gZkClient.getChildren(keyBuilder.taskCurrentStateSessions(testInstanceName).toString()),
+        Collections.emptyList());
+
+    participant.syncStop();
+    deleteCluster(clusterName);
+  }
+
   /*
    * Tests session expiry before calling 
ZkHelixManager.handleNewSession(sessionId).
    * This test checks to see if the expired sessions would be discarded and 
the operation would

Reply via email to