This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch helix-vm-freeze in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3cf41231e1108f2d6250336177eb3851ed53f5fb Author: Molly Gao <[email protected]> AuthorDate: Mon Mar 21 13:31:35 2022 -0700 Enable HelixManager as an event listener (#1978) Make helix manager cloud event aware by registering a cloud event listener when connect Helix manager --- .../java/org/apache/helix/HelixCloudProperty.java | 21 ++ .../helix/cloud/event/CloudEventHandler.java | 32 +-- .../helix/cloud/event/CloudEventListener.java | 24 ++- .../event/helix/CloudEventCallbackProperty.java | 138 ++++++++++++ .../event/helix/DefaultCloudEventCallbackImpl.java | 69 ++++++ .../cloud/event/helix/HelixCloudEventListener.java | 121 +++++++++++ .../apache/helix/manager/zk/ZKHelixManager.java | 32 +++ .../cloud/event/MockCloudEventCallbackImpl.java | 61 ++++++ .../event/TestCloudEventCallbackProperty.java | 239 +++++++++++++++++++++ 9 files changed, 702 insertions(+), 35 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java index 6cbc2e1..9af7175 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java +++ b/helix-core/src/main/java/org/apache/helix/HelixCloudProperty.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.helix.cloud.constants.CloudProvider; +import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty; import org.apache.helix.model.CloudConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,10 @@ public class HelixCloudProperty { // Other customized properties that may be used. private Properties _customizedCloudProperties = new Properties(); + private boolean _isCloudEventCallbackEnabled; + + private CloudEventCallbackProperty _cloudEventCallbackProperty; + /** * Initialize Helix Cloud Property based on the provider * @param @@ -180,4 +185,20 @@ public class HelixCloudProperty { public void setCustomizedCloudProperties(Properties customizedCloudProperties) { _customizedCloudProperties.putAll(customizedCloudProperties); } + + public boolean isCloudEventCallbackEnabled() { + return _isCloudEventCallbackEnabled; + } + + public void setCloudEventCallbackEnabled(boolean enabled) { + _isCloudEventCallbackEnabled = enabled; + } + + public CloudEventCallbackProperty getCloudEventCallbackProperty() { + return _cloudEventCallbackProperty; + } + + public void setCloudEventCallbackProperty(CloudEventCallbackProperty cloudEventCallbackProperty) { + _cloudEventCallbackProperty = cloudEventCallbackProperty; + } } diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java index 295194b..5dcbe16 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventHandler.java @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory; public class CloudEventHandler { private static final Logger LOG = LoggerFactory.getLogger(CloudEventHandler.class.getName()); private List<CloudEventListener> _unorderedEventListenerList = new ArrayList<>(); - private Optional<CloudEventListener> _preEventHandlerCallback; - private Optional<CloudEventListener> _postEventHandlerCallback; + private Optional<CloudEventListener> _preEventHandlerCallback = Optional.empty(); + private Optional<CloudEventListener> _postEventHandlerCallback = Optional.empty(); /** * Register an event listener to the event handler. @@ -70,28 +70,12 @@ public class CloudEventHandler { } /** - * Trigger the callback / listeners in order of - * 1. PreEventHandlerCallback - * 2. Unordered CloudEventListener list - * 3. PostEventHandlerCallback - * @param eventInfo the object contains any information about the incoming event + * Trigger the registered listeners in order, + * and trigger the corresponding callback registered in the listeners for a certain type of event. */ - public void onPause(Object eventInfo) { - _preEventHandlerCallback.ifPresent(callback -> callback.onPause(eventInfo)); - _unorderedEventListenerList.parallelStream().forEach(listener -> listener.onPause(eventInfo)); - _postEventHandlerCallback.ifPresent(callback -> callback.onPause(eventInfo)); - } - - /** - * Trigger the callback / listeners in order of - * 1. PreEventHandlerCallback - * 2. Unordered CloudEventListener list - * 3. PostEventHandlerCallback - * @param eventInfo the object contains any information about the incoming event - */ - public void onResume(Object eventInfo) { - _preEventHandlerCallback.ifPresent(callback -> callback.onResume(eventInfo)); - _unorderedEventListenerList.parallelStream().forEach(listener -> listener.onResume(eventInfo)); - _postEventHandlerCallback.ifPresent(callback -> callback.onResume(eventInfo)); + public void performAction(Object eventType, Object eventInfo) { + _preEventHandlerCallback.ifPresent(callback -> callback.performAction(eventType, eventInfo)); + _unorderedEventListenerList.parallelStream().forEach(listener -> listener.performAction(eventType, eventInfo)); + _postEventHandlerCallback.ifPresent(callback -> callback.performAction(eventType, eventInfo)); } } diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java index b4adcc3..5e75337 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/CloudEventListener.java @@ -24,25 +24,27 @@ package org.apache.helix.cloud.event; * The listeners can be registered to {@link CloudEventHandler}. */ public interface CloudEventListener { - enum ListenerType { - PRE_EVENT_HANDLER, UNORDERED, POST_EVENT_HANDLER - } - /** - * Callback for when a pause event is incoming - * @param eventInfo the info of the incoming event + * Defines different listener types + * It determines the position this listener being triggered in {@link CloudEventHandler} + * Order being: PRE_EVENT_HANDLER -> UNORDERED (in parallel) -> POST_EVENT_HANDLER */ - void onPause(Object eventInfo); + enum ListenerType { + PRE_EVENT_HANDLER, + UNORDERED, + POST_EVENT_HANDLER + } /** - * Callback for when a pause event finishes - * @param eventInfo the info of the incoming event + * Perform action to react the the event + * @param eventType Type of the event + * @param eventInfo Detailed information about the event */ - void onResume(Object eventInfo); + void performAction(Object eventType, Object eventInfo); /** * Get the listener type of a listener - * @return the type of the listener + * @return The type of the listener */ ListenerType getListenerType(); } diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java new file mode 100644 index 0000000..1888eb6 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/CloudEventCallbackProperty.java @@ -0,0 +1,138 @@ +package org.apache.helix.cloud.event.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +import org.apache.helix.HelixManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A property for users to customize the behavior of a Helix manager as a cloud event listener + */ +public class CloudEventCallbackProperty { + private static final Logger LOG = + LoggerFactory.getLogger(CloudEventCallbackProperty.class.getName()); + + private Set<HelixOperation> _enabledHelixOperation; + private Map<UserDefinedCallbackType, BiConsumer<HelixManager, Object>> _userDefinedCallbackMap; + private final Map<String, String> _userArgs; + + /** + * Constructor + * @param userArgs A map contains information that users pass in + */ + public CloudEventCallbackProperty(Map<String, String> userArgs) { + _enabledHelixOperation = new HashSet<>(); + _userDefinedCallbackMap = new HashMap<>(); + _userArgs = userArgs; + } + + /** + * Keys for retrieve information from the map users pass in + */ + public static class UserArgsInputKey { + public static final String CALLBACK_IMPL_CLASS_NAME = "callbackImplClassName"; + } + + /** + * A collection of types of Helix operations + */ + public enum HelixOperation { + ENABLE_DISABLE_INSTANCE, + MAINTENANCE_MODE + } + + /** + * A collection of types and positions for user to plug in customized callback + */ + public enum UserDefinedCallbackType { + PRE_ON_PAUSE, + POST_ON_PAUSE, + PRE_ON_RESUME, + POST_ON_RESUME, + } + + /** + * Enable an Helix-supported operation + * The operation is implemented in the callback impl class + * @param operation operation type + */ + public void setHelixOperationEnabled(HelixOperation operation, boolean enabled) { + if (enabled) { + _enabledHelixOperation.add(operation); + } else { + _enabledHelixOperation.remove(operation); + } + } + + /** + * Register a user defined callback at a user specified position + * The position is relative to Helix operations + * There are two options for each type (onPause or onResume): + * 1. PRE: The user defined callback will be the first callback being called in the listener + * 2. POST: The user defined callback will be the last callback being called in the listener + * @param callbackType The type and position for registering the callback + * @param callback The implementation of the callback + */ + public void registerUserDefinedCallback(UserDefinedCallbackType callbackType, + BiConsumer<HelixManager, Object> callback) { + LOG.info("Registering callback {} as {} type user defined callback...", callback, + callbackType.name()); + _userDefinedCallbackMap.put(callbackType, callback); + } + + /** + * Unregister a user defined callback at a user specified position + * @param callbackType The type and position for registering the callback + */ + public void unregisterUserDefinedCallback(UserDefinedCallbackType callbackType) { + LOG.info("Unregistering {} type user defined callback...", callbackType.name()); + _userDefinedCallbackMap.remove(callbackType); + } + + /** + * Get the user passed-in information + * @return Empty map if not defined; an unmodified map otherwise + */ + public Map<String, String> getUserArgs() { + return _userArgs == null ? Collections.emptyMap() : Collections.unmodifiableMap(_userArgs); + } + + /** + * Get the map where user defined callbacks are stored + */ + public Map<UserDefinedCallbackType, BiConsumer<HelixManager, Object>> getUserDefinedCallbackMap() { + return _userDefinedCallbackMap; + } + + /** + * Get the set where enabled Helix operations are stored + */ + public Set<HelixOperation> getEnabledHelixOperation() { + return _enabledHelixOperation; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java new file mode 100644 index 0000000..d9aa3b2 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java @@ -0,0 +1,69 @@ +package org.apache.helix.cloud.event.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixManager; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +/** + * A default callback implementation class to be used in {@link HelixCloudEventListener} + */ +public class DefaultCloudEventCallbackImpl { + + /** + * Disable the instance + * @param manager The helix manager associated with the listener + * @param eventInfo Detailed information about the event + */ + public void disableInstance(HelixManager manager, Object eventInfo) { + // To be implemented + throw new NotImplementedException(); + } + + /** + * Enable the instance + * @param manager The helix manager associated with the listener + * @param eventInfo Detailed information about the event + */ + public void enableInstance(HelixManager manager, Object eventInfo) { + // To be implemented + throw new NotImplementedException(); + } + + /** + * + * @param manager The helix manager associated with the listener + * @param eventInfo Detailed information about the event + */ + public void enterMaintenanceMode(HelixManager manager, Object eventInfo) { + // To be implemented + throw new NotImplementedException(); + } + + /** + * + * @param manager The helix manager associated with the listener + * @param eventInfo Detailed information about the event + */ + public void exitMaintenanceMode(HelixManager manager, Object eventInfo) { + // To be implemented + throw new NotImplementedException(); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java new file mode 100644 index 0000000..57d5a43 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixCloudEventListener.java @@ -0,0 +1,121 @@ +package org.apache.helix.cloud.event.helix; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.function.BiConsumer; + +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.cloud.event.CloudEventListener; +import org.apache.helix.util.HelixUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation; +import static org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType; + +/** + * A helix manager-based cloud event listener implementation + */ +public class HelixCloudEventListener implements CloudEventListener { + private static Logger LOG = LoggerFactory.getLogger(HelixCloudEventListener.class); + + private final CloudEventCallbackProperty _property; + private final DefaultCloudEventCallbackImpl _callbackImplClass; + private final HelixManager _helixManager; + + public HelixCloudEventListener(CloudEventCallbackProperty property, HelixManager helixManager) + throws InstantiationException, IllegalAccessException { + this._property = property; + this._helixManager = helixManager; + this._callbackImplClass = loadCloudEventCallbackImplClass(property.getUserArgs() + .getOrDefault(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME, + DefaultCloudEventCallbackImpl.class.getCanonicalName())); + } + + /** + * The type of incoming event + */ + public enum EventType { + ON_PAUSE, ON_RESUME + } + + /** + * Below are lists defining the type and sequence of callbacks for each type of events + */ + private final List<Object> onPauseOperations = Arrays + .asList(UserDefinedCallbackType.PRE_ON_PAUSE, HelixOperation.MAINTENANCE_MODE, + HelixOperation.ENABLE_DISABLE_INSTANCE, UserDefinedCallbackType.POST_ON_PAUSE); + private final List<Object> onResumeOperations = Arrays + .asList(UserDefinedCallbackType.PRE_ON_RESUME, HelixOperation.ENABLE_DISABLE_INSTANCE, + HelixOperation.MAINTENANCE_MODE, UserDefinedCallbackType.POST_ON_RESUME); + + @Override + public void performAction(Object eventType, Object eventInfo) { + LOG.info("Received {} event, event info {}, timestamp {}. Acting on the event... " + + "Actor {}, based on callback implementation class {}.", ((EventType) eventType).name(), + eventInfo == null ? "N/A" : eventInfo.toString(), System.currentTimeMillis(), _helixManager, + _callbackImplClass.getClass().getCanonicalName()); + + if (eventType == EventType.ON_PAUSE) { + onPauseOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation)); + } else if (eventType == EventType.ON_RESUME) { + onResumeOperations.forEach(operation -> executeOperation(eventType, eventInfo, operation)); + } + } + + private void executeOperation(Object eventType, Object eventInfo, Object operation) { + Set<CloudEventCallbackProperty.HelixOperation> enabledHelixOperationSet = + _property.getEnabledHelixOperation(); + if (HelixOperation.ENABLE_DISABLE_INSTANCE.equals(operation)) { + if (enabledHelixOperationSet.contains(HelixOperation.ENABLE_DISABLE_INSTANCE)) { + if (eventType == EventType.ON_PAUSE) { + _callbackImplClass.disableInstance(_helixManager, eventInfo); + } else { + _callbackImplClass.enableInstance(_helixManager, eventInfo); + } + } + } else if (HelixOperation.MAINTENANCE_MODE.equals(operation)) { + if (enabledHelixOperationSet.contains(HelixOperation.MAINTENANCE_MODE)) { + if (eventType == EventType.ON_PAUSE) { + _callbackImplClass.enterMaintenanceMode(_helixManager, eventInfo); + } else { + _callbackImplClass.exitMaintenanceMode(_helixManager, eventInfo); + } + } + } else if (operation instanceof UserDefinedCallbackType) { + BiConsumer<HelixManager, Object> callback = + _property.getUserDefinedCallbackMap().get(operation); + if (callback != null) { + callback.accept(_helixManager, eventInfo); + } + } else { + // Should not reach here + throw new HelixException("Unknown category of cloud event operation " + operation.toString()); + } + } + + @Override + public CloudEventListener.ListenerType getListenerType() { + return CloudEventListener.ListenerType.UNORDERED; + } + + private DefaultCloudEventCallbackImpl loadCloudEventCallbackImplClass(String implClassName) + throws IllegalAccessException, InstantiationException { + DefaultCloudEventCallbackImpl implClass; + try { + LOG.info("Loading class: " + implClassName); + implClass = (DefaultCloudEventCallbackImpl) HelixUtil.loadClass(getClass(), implClassName) + .newInstance(); + } catch (Exception e) { + implClass = DefaultCloudEventCallbackImpl.class.newInstance(); + LOG.error( + "No cloud event callback implementation class found for: {}. message: {}. Using default callback impl class instead.", + implClassName, e.getMessage()); + } + LOG.info("Using {} as cloud event callback impl class.", + implClass.getClass().getCanonicalName()); + return implClass; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index d7a3472..22f0330 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -38,6 +38,7 @@ import org.apache.helix.BaseDataAccessor; import org.apache.helix.ClusterMessagingService; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixCloudProperty; import org.apache.helix.HelixConstants.ChangeType; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; @@ -70,6 +71,10 @@ import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.api.listeners.MessageListener; import org.apache.helix.api.listeners.ResourceConfigChangeListener; import org.apache.helix.api.listeners.ScopedConfigChangeListener; +import org.apache.helix.cloud.event.CloudEventHandlerFactory; +import org.apache.helix.cloud.event.CloudEventListener; +import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty; +import org.apache.helix.cloud.event.helix.HelixCloudEventListener; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.healthcheck.ParticipantHealthReportCollector; @@ -175,6 +180,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<>(); /** + * Cloud fields + */ + private CloudEventListener _cloudEventListener; + + /** * status dump timer-task */ protected static class StatusDumpTask extends HelixTimerTask { @@ -816,6 +826,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } throw e; } + + if (_helixManagerProperty != null) { + HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty(); + if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) { + _cloudEventListener = + new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this); + CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener); + } + } } @Override @@ -861,6 +880,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { _helixPropertyStore = null; + if (_cloudEventListener != null) { + CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener); + _cloudEventListener = null; + } + synchronized (this) { if (_controller != null) { _controller = null; @@ -1446,6 +1470,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { return _sessionStartTime; } + private CloudEventCallbackProperty getCloudEventListenerCallbackProperty() { + HelixCloudProperty cloudProperty = _helixManagerProperty.getHelixCloudProperty(); + if (cloudProperty == null || !cloudProperty.isCloudEventCallbackEnabled()) { + return null; + } + return cloudProperty.getCloudEventCallbackProperty(); + } + /* * Prepares connection config and client config based on the internal parameters given to * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java new file mode 100644 index 0000000..60bb12f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/cloud/event/MockCloudEventCallbackImpl.java @@ -0,0 +1,61 @@ +package org.apache.helix.cloud.event; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashSet; +import java.util.Set; + +import org.apache.helix.HelixManager; +import org.apache.helix.cloud.event.helix.DefaultCloudEventCallbackImpl; + +public class MockCloudEventCallbackImpl extends DefaultCloudEventCallbackImpl { + public enum OperationType { + ON_PAUSE_DISABLE_INSTANCE, + ON_RESUME_ENABLE_INSTANCE, + ON_PAUSE_MAINTENANCE_MODE, + ON_RESUME_MAINTENANCE_MODE, + PRE_ON_PAUSE, + POST_ON_PAUSE, + PRE_ON_RESUME, + POST_ON_RESUME + } + + public static Set<OperationType> triggeredOperation = new HashSet<>(); + + @Override + public void disableInstance(HelixManager manager, Object eventInfo) { + triggeredOperation.add(OperationType.ON_PAUSE_DISABLE_INSTANCE); + } + + @Override + public void enableInstance(HelixManager manager, Object eventInfo) { + triggeredOperation.add(OperationType.ON_RESUME_ENABLE_INSTANCE); + } + + @Override + public void enterMaintenanceMode(HelixManager manager, Object eventInfo) { + triggeredOperation.add(OperationType.ON_PAUSE_MAINTENANCE_MODE); + } + + @Override + public void exitMaintenanceMode(HelixManager manager, Object eventInfo) { + triggeredOperation.add(OperationType.ON_RESUME_MAINTENANCE_MODE); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java new file mode 100644 index 0000000..70d9e1b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestCloudEventCallbackProperty.java @@ -0,0 +1,239 @@ +package org.apache.helix.cloud.event; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; + +import org.apache.helix.HelixCloudProperty; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerProperty; +import org.apache.helix.InstanceType; +import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty; +import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.HelixOperation; +import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty.UserDefinedCallbackType; +import org.apache.helix.cloud.event.helix.HelixCloudEventListener; +import org.apache.helix.manager.zk.HelixManagerStateListener; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.CloudConfig; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestCloudEventCallbackProperty { + private HelixManager _helixManager; + private HelixCloudProperty _cloudProperty; + private final static String CLUSTER_NAME = "testCluster"; + + @BeforeClass + public void beforeClass() throws Exception { + // Set up Helix manager property: Helix Cloud Property + _cloudProperty = new HelixCloudProperty(new CloudConfig(new ZNRecord(CLUSTER_NAME))); + _cloudProperty.setCloudEventCallbackEnabled(true); + HelixManagerProperty.Builder managerPropertyBuilder = new HelixManagerProperty.Builder(); + managerPropertyBuilder.setHelixCloudProperty(_cloudProperty); + + // Build Helix manager property + HelixManagerProperty managerProperty = managerPropertyBuilder.build(); + + // Create Helix Manager + _helixManager = + new MockEventAwareZKHelixManager(CLUSTER_NAME, "instanceName", InstanceType.PARTICIPANT, + null, null, managerProperty); + } + + @AfterTest + public void afterTest() { + _helixManager.disconnect(); + _cloudProperty.getCloudEventCallbackProperty() + .setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, false); + _cloudProperty.getCloudEventCallbackProperty() + .setHelixOperationEnabled(HelixOperation.MAINTENANCE_MODE, false); + _cloudProperty.getCloudEventCallbackProperty() + .unregisterUserDefinedCallback(UserDefinedCallbackType.PRE_ON_PAUSE); + _cloudProperty.getCloudEventCallbackProperty() + .unregisterUserDefinedCallback(UserDefinedCallbackType.POST_ON_PAUSE); + _cloudProperty.getCloudEventCallbackProperty() + .unregisterUserDefinedCallback(UserDefinedCallbackType.PRE_ON_RESUME); + _cloudProperty.getCloudEventCallbackProperty() + .unregisterUserDefinedCallback(UserDefinedCallbackType.POST_ON_RESUME); + MockCloudEventCallbackImpl.triggeredOperation.clear(); + } + + @Test + public void testOptionalHelixOperation() throws Exception { + // Cloud event callback property + CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections + .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME, + MockCloudEventCallbackImpl.class.getCanonicalName())); + property.setHelixOperationEnabled(HelixOperation.ENABLE_DISABLE_INSTANCE, true); + _cloudProperty.setCloudEventCallbackProperty(property); + + _helixManager.connect(); + + // Manually trigger event + CloudEventHandlerFactory.getInstance() + .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null); + Assert.assertTrue( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE)); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE)); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE)); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE)); + + property.setHelixOperationEnabled(HelixOperation.MAINTENANCE_MODE, true); + + MockCloudEventCallbackImpl.triggeredOperation.clear(); + + // Manually trigger event + CloudEventHandlerFactory.getInstance() + .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null); + Assert.assertTrue( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE)); + Assert.assertTrue( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE)); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE)); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE)); + + MockCloudEventCallbackImpl.triggeredOperation.clear(); + + // Manually trigger event + CloudEventHandlerFactory.getInstance() + .performAction(HelixCloudEventListener.EventType.ON_RESUME, null); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_DISABLE_INSTANCE)); + Assert.assertFalse( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_PAUSE_MAINTENANCE_MODE)); + Assert.assertTrue( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_ENABLE_INSTANCE)); + Assert.assertTrue( + callbackTriggered(MockCloudEventCallbackImpl.OperationType.ON_RESUME_MAINTENANCE_MODE)); + } + + @Test + public void testUserDefinedCallback() throws Exception { + afterTest(); + // Cloud event callback property + CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections + .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME, + MockCloudEventCallbackImpl.class.getCanonicalName())); + _cloudProperty.setCloudEventCallbackProperty(property); + + _helixManager.connect(); + + property + .registerUserDefinedCallback(UserDefinedCallbackType.PRE_ON_PAUSE, (manager, eventInfo) -> { + MockCloudEventCallbackImpl.triggeredOperation + .add(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE); + }); + property.registerUserDefinedCallback(UserDefinedCallbackType.POST_ON_PAUSE, + (manager, eventInfo) -> { + MockCloudEventCallbackImpl.triggeredOperation + .add(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE); + }); + property.registerUserDefinedCallback(UserDefinedCallbackType.PRE_ON_RESUME, + (manager, eventInfo) -> { + MockCloudEventCallbackImpl.triggeredOperation + .add(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME); + }); + property.registerUserDefinedCallback(UserDefinedCallbackType.POST_ON_RESUME, + (manager, eventInfo) -> { + MockCloudEventCallbackImpl.triggeredOperation + .add(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME); + }); + + // Manually trigger event + CloudEventHandlerFactory.getInstance() + .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null); + Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE)); + Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE)); + Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME)); + Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME)); + + MockCloudEventCallbackImpl.triggeredOperation.clear(); + + CloudEventHandlerFactory.getInstance() + .performAction(HelixCloudEventListener.EventType.ON_RESUME, null); + Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_PAUSE)); + Assert.assertFalse(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_PAUSE)); + Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.PRE_ON_RESUME)); + Assert.assertTrue(callbackTriggered(MockCloudEventCallbackImpl.OperationType.POST_ON_RESUME)); + } + + @Test + public void testUsingInvalidImplClassName() throws Exception { + // Cloud event callback property + CloudEventCallbackProperty property = new CloudEventCallbackProperty(Collections + .singletonMap(CloudEventCallbackProperty.UserArgsInputKey.CALLBACK_IMPL_CLASS_NAME, + "org.apache.helix.cloud.InvalidClassName")); + _cloudProperty.setCloudEventCallbackProperty(property); + + _helixManager.connect(); + + // Manually trigger event + CloudEventHandlerFactory.getInstance() + .performAction(HelixCloudEventListener.EventType.ON_PAUSE, null); + } + + private boolean callbackTriggered(MockCloudEventCallbackImpl.OperationType type) { + return MockCloudEventCallbackImpl.triggeredOperation.contains(type); + } + + public static class MockEventAwareZKHelixManager extends ZKHelixManager { + private final HelixManagerProperty _helixManagerProperty; + private CloudEventListener _cloudEventListener; + + /** + * Use a mock zk helix manager to avoid the need to connect to zk + */ + public MockEventAwareZKHelixManager(String clusterName, String instanceName, + InstanceType instanceType, String zkAddress, HelixManagerStateListener stateListener, + HelixManagerProperty helixManagerProperty) { + super(clusterName, instanceName, instanceType, zkAddress, stateListener, + helixManagerProperty); + _helixManagerProperty = helixManagerProperty; + } + + @Override + public void connect() throws IllegalAccessException, InstantiationException { + if (_helixManagerProperty != null) { + HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty(); + if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) { + _cloudEventListener = + new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this); + CloudEventHandlerFactory.getInstance().registerCloudEventListener(_cloudEventListener); + } + } + } + + @Override + public void disconnect() { + if (_cloudEventListener != null) { + CloudEventHandlerFactory.getInstance().unregisterCloudEventListener(_cloudEventListener); + _cloudEventListener = null; + } + } + } +}
