This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new dadaba13c implement util for cloud event (#2149)
dadaba13c is described below
commit dadaba13c3717ffbad619a9f4a064ed987ea2454
Author: xyuanlu <[email protected]>
AuthorDate: Fri Jun 24 09:09:47 2022 -0700
implement util for cloud event (#2149)
This change add implementation for HelixCloudEventUtil.
---
.../event/helix/DefaultCloudEventCallbackImpl.java | 77 +++++++++++---
.../cloud/event/helix/HelixCloudEventListener.java | 4 +-
.../cloud/event/helix/HelixEventHandlingUtil.java | 111 +++++++++++++++++----
.../event/TestDefaultCloudEventCallbackImpl.java | 111 +++++++++++++++++++++
4 files changed, 266 insertions(+), 37 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
index d9aa3b2d1..04ad4b798 100644
---
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
+++
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java
@@ -19,51 +19,102 @@ package org.apache.helix.cloud.event.helix;
* under the License.
*/
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A default callback implementation class to be used in {@link
HelixCloudEventListener}
*/
public class DefaultCloudEventCallbackImpl {
+ private static final Logger LOG =
LoggerFactory.getLogger(DefaultCloudEventCallbackImpl.class);
+ private final String _instanceReason = "Cloud event in
DefaultCloudEventCallback at %s";
+ private final String _emmReason = "Cloud event EMM in
DefaultCloudEventCallback by %s at %s";
/**
- * Disable the instance
+ * Disable the instance and track the cloud event in map field
disabledInstancesWithInfo in
+ * cluster config. Will not re-disable the instance if the instance is
already disabled for
+ * other reason. (So we will not overwrite the disabled reason and enable
this instance when
+ * on-unpause)
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void disableInstance(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ String message = String.format(_instanceReason,
System.currentTimeMillis());
+ LOG.info("DefaultCloudEventCallbackImpl disable Instance {}",
manager.getInstanceName());
+ if (InstanceValidationUtil
+ .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName()))
{
+ manager.getClusterManagmentTool()
+ .enableInstance(manager.getClusterName(), manager.getInstanceName(),
false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+ }
+
HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
+ manager.getInstanceName(),
manager.getHelixDataAccessor().getBaseDataAccessor(), false,
+ message);
}
/**
- * Enable the instance
+ * Remove tracked cloud event in cluster config and enable the instance
+ * We only enable instance that is disabled because of cloud event.
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enableInstance(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ LOG.info("DefaultCloudEventCallbackImpl enable Instance {}",
manager.getInstanceName());
+ String instanceName = manager.getInstanceName();
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String message = String.format(_instanceReason,
System.currentTimeMillis());
+ HelixEventHandlingUtil
+ .updateCloudEventOperationInClusterConfig(manager.getClusterName(),
instanceName,
+ manager.getHelixDataAccessor().getBaseDataAccessor(), true,
message);
+ if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName,
accessor)) {
+
manager.getClusterManagmentTool().enableInstance(manager.getClusterName(),
instanceName, true,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
+ }
}
/**
- *
+ * Will enter MM when the cluster is not in MM
+ * TODO: we should add maintenance reason when EMM with cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void enterMaintenanceMode(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ if
(!manager.getClusterManagmentTool().isInMaintenanceMode(manager.getClusterName()))
{
+ LOG.info("DefaultCloudEventCallbackImpl enterMaintenanceMode by {}",
+ manager.getInstanceName());
+ manager.getClusterManagmentTool()
+ .manuallyEnableMaintenanceMode(manager.getClusterName(), true,
+ String.format(_emmReason, manager.getInstanceName(),
System.currentTimeMillis()),
+ null);
+ }
}
/**
- *
+ * Will exit MM when when cluster config tracks no ongoing cloud event being
handling
+ * TODO: we should also check the maintenance reason and only exit when EMM
is caused by cloud event
* @param manager The helix manager associated with the listener
* @param eventInfo Detailed information about the event
*/
public void exitMaintenanceMode(HelixManager manager, Object eventInfo) {
- // To be implemented
- throw new NotImplementedException();
+ ClusterConfig clusterConfig = manager.getHelixDataAccessor()
+
.getProperty(manager.getHelixDataAccessor().keyBuilder().clusterConfig());
+ if (HelixEventHandlingUtil.checkNoInstanceUnderCloudEvent(clusterConfig)) {
+ LOG.info("DefaultCloudEventCallbackImpl exitMaintenanceMode by {}",
+ manager.getInstanceName());
+ manager.getClusterManagmentTool()
+ .manuallyEnableMaintenanceMode(manager.getClusterName(), false,
+ String.format(_emmReason, manager.getInstanceName(),
System.currentTimeMillis()),
+ null);
+ } else {
+ LOG.info(
+ "DefaultCloudEventCallbackImpl will not exitMaintenanceMode as there
are {} instances under cloud event",
+ clusterConfig.getDisabledInstancesWithInfo().keySet().size());
+ }
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
index 57d5a43b3..57e5eb1ee 100644
---
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
+++
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java
@@ -107,9 +107,9 @@ public class HelixCloudEventListener implements
CloudEventListener {
try {
LOG.info("Loading class: " + implClassName);
implClass = (DefaultCloudEventCallbackImpl)
HelixUtil.loadClass(getClass(), implClassName)
- .newInstance();
+ .getConstructor().newInstance();
} catch (Exception e) {
- implClass = DefaultCloudEventCallbackImpl.class.newInstance();
+ implClass = new DefaultCloudEventCallbackImpl();
LOG.error(
"No cloud event callback implementation class found for: {}.
message: {}. Using default callback impl class instead.",
implClassName, e.getMessage());
diff --git
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
index 7a17c3f2e..ee96a13ee 100644
---
a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java
@@ -19,40 +19,107 @@ package org.apache.helix.cloud.event.helix;
* under the License.
*/
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.ConfigStringUtil;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- class HelixEventHandlingUtil {
+class HelixEventHandlingUtil {
+ private static Logger LOG =
LoggerFactory.getLogger(HelixEventHandlingUtil.class);
/**
- * Enable or disable an instance for cloud event.
- * It will enable/disable Helix for that instance. Also add the instance
cloud event info to
- * clusterConfig Znode when enable.
- * @param clusterName
+ * check if instance is disabled by cloud event.
* @param instanceName
- * @param message
- * @param isEnable
* @param dataAccessor
- * @return return failure when either enable/disable failed or update
cluster ZNode failed.
+ * @return return true only when instance is Helix disabled and the disabled
reason in
+ * instanceConfig is cloudEvent
*/
- static boolean enableInstanceForCloudEvent(String clusterName, String
instanceName, String message,
- boolean isEnable, BaseDataAccessor dataAccessor) {
- // TODO add impl here
- return true;
+ static boolean isInstanceDisabledForCloudEvent(String instanceName,
+ HelixDataAccessor dataAccessor) {
+ InstanceConfig instanceConfig =
+
dataAccessor.getProperty(dataAccessor.keyBuilder().instanceConfig(instanceName));
+ if (instanceConfig == null) {
+ throw new HelixException("Instance: " + instanceName
+ + ", instance config does not exist");
+ }
+ return !InstanceValidationUtil.isEnabled(dataAccessor, instanceName) &&
instanceConfig
+ .getInstanceDisabledType()
+ .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
}
/**
- * check if instance is disabled by cloud event.
- * @param clusterName
- * @param instanceName
- * @param dataAccessor
- * @return return true only when instance is Helix disabled and has the
cloud event info in
- * clusterConfig ZNode.
+ * Update map field disabledInstancesWithInfo in clusterConfig with
cloudEvent instance info
*/
- static boolean IsInstanceDisabledForCloudEvent(String clusterName, String
instanceName,
- BaseDataAccessor dataAccessor) {
- // TODO add impl here
- return true;
+ static void updateCloudEventOperationInClusterConfig(String clusterName,
String instanceName,
+ BaseDataAccessor baseAccessor, boolean enable, String message) {
+ String path = PropertyPathBuilder.clusterConfig(clusterName);
+
+ if (!baseAccessor.exists(path, 0)) {
+ throw new HelixException("Cluster " + clusterName + ": cluster config
does not exist");
+ }
+
+ if (!baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ if (currentData == null) {
+ throw new HelixException("Cluster: " + clusterName + ": cluster
config is null");
+ }
+
+ ClusterConfig clusterConfig = new ClusterConfig(currentData);
+ Map<String, String> disabledInstancesWithInfo =
+ new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo());
+ if (enable) {
+ disabledInstancesWithInfo.keySet().remove(instanceName);
+ } else {
+ // disabledInstancesWithInfo is only used for cloud event handling.
+ String timeStamp = String.valueOf(System.currentTimeMillis());
+ disabledInstancesWithInfo.put(instanceName, ZKHelixAdmin
+ .assembleInstanceBatchedDisabledInfo(
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message,
timeStamp));
+ }
+ clusterConfig.setDisabledInstancesWithInfo(disabledInstancesWithInfo);
+
+ return clusterConfig.getRecord();
+ }
+ }, AccessOption.PERSISTENT)) {
+ LOG.error("Failed to update cluster config {} for {} instance {}. {}",
clusterName,
+ enable ? "enable" : "disable", instanceName, message);
+ }
}
+ /**
+ * Return true if no instance is under cloud event handling
+ * @param clusterConfig
+ * @return
+ */
+ static boolean checkNoInstanceUnderCloudEvent(ClusterConfig clusterConfig) {
+ Map<String, String> clusterConfigTrackedEvent =
clusterConfig.getDisabledInstancesWithInfo();
+ if (clusterConfigTrackedEvent == null ||
clusterConfigTrackedEvent.isEmpty()) {
+ return true;
+ }
+
+ for (Map.Entry<String, String> entry :
clusterConfigTrackedEvent.entrySet()) {
+ if (ConfigStringUtil.parseConcatenatedConfig(entry.getValue())
+
.get(ClusterConfig.ClusterConfigProperty.HELIX_DISABLED_TYPE.toString())
+ .equals(InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name())) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
new file mode 100644
index 000000000..bf9b59dc2
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java
@@ -0,0 +1,111 @@
+package org.apache.helix.cloud.event;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.util.InstanceValidationUtil;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestDefaultCloudEventCallbackImpl extends ZkStandAloneCMTestBase {
+ private final DefaultCloudEventCallbackImpl _impl = new
DefaultCloudEventCallbackImpl();
+ private MockParticipantManager _instanceManager;
+ private HelixAdmin _admin;
+
+ public TestDefaultCloudEventCallbackImpl() throws IllegalAccessException,
InstantiationException {
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ _instanceManager = _participants[0];
+ _admin = _instanceManager.getClusterManagmentTool();
+ }
+
+ @Test
+ public void testDisableInstance() {
+ Assert.assertTrue(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
+ Assert.assertEquals(_manager.getConfigAccessor()
+ .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName())
+ .getInstanceDisabledType(),
InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name());
+
+ // Should not disable instance if it is already disabled due to other
reasons
+ // And disabled type should remain unchanged
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(),
false);
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
+ Assert.assertEquals(_manager.getConfigAccessor()
+ .getInstanceConfig(CLUSTER_NAME,
_instanceManager.getInstanceName())
+ .getInstanceDisabledType(),
+
InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name());
+
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(),
false,
+ InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null);
+ }
+
+ @Test (dependsOnMethods = "testDisableInstance")
+ public void testEnableInstance() {
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
+ // Should enable instance if the instance is disabled due to cloud event
+ _impl.enableInstance(_instanceManager, null);
+ Assert.assertTrue(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
+
+ // Should not enable instance if it is not disabled due to cloud event
+ _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(),
false);
+ _impl.enableInstance(_instanceManager, null);
+ Assert.assertFalse(InstanceValidationUtil
+ .isEnabled(_manager.getHelixDataAccessor(),
_instanceManager.getInstanceName()));
+ _admin.enableInstance(_instanceManager.getClusterName(),
_instanceManager.getInstanceName(),
+ true);
+ }
+
+ @Test
+ public void testEnterMaintenanceMode() {
+ Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ _impl.enterMaintenanceMode(_instanceManager, null);
+ _impl.disableInstance(_instanceManager, null);
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ }
+
+ @Test (dependsOnMethods = "testEnterMaintenanceMode")
+ public void testExitMaintenanceMode() {
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ // Should not exit maintenance mode if there is remaining live instance
that is disabled due to cloud event
+ _impl.exitMaintenanceMode(_instanceManager, null);
+ Assert.assertTrue(_admin.isInMaintenanceMode(CLUSTER_NAME));
+
+ // Should exit maintenance mode if there is no remaining live instance
that is disabled due to cloud event
+ _impl.enableInstance(_instanceManager, null);
+ _impl.exitMaintenanceMode(_instanceManager, null);
+ Assert.assertFalse(_admin.isInMaintenanceMode(CLUSTER_NAME));
+ }
+}
\ No newline at end of file