This is an automated email from the ASF dual-hosted git repository.
amagyar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 1da5edc9f KNOX-2959 - Auto discovery to support scaling scenarios
(#796)
1da5edc9f is described below
commit 1da5edc9f044a83262f485e2cdb767384a92038c
Author: Attila Magyar <[email protected]>
AuthorDate: Wed Oct 4 13:14:42 2023 +0200
KNOX-2959 - Auto discovery to support scaling scenarios (#796)
---
.../ClouderaManagerServiceDiscoveryMessages.java | 11 ++
.../cm/monitor/PollingConfigurationAnalyzer.java | 121 +++++++++++++++++----
.../monitor/PollingConfigurationAnalyzerTest.java | 34 +++++-
3 files changed, 146 insertions(+), 20 deletions(-)
diff --git
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 4c1842fa2..ca331241d 100644
---
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -16,6 +16,8 @@
*/
package org.apache.knox.gateway.topology.discovery.cm;
+import java.util.Set;
+
import com.cloudera.api.swagger.client.ApiException;
import org.apache.knox.gateway.i18n.messages.Message;
import org.apache.knox.gateway.i18n.messages.MessageLevel;
@@ -199,6 +201,9 @@ public interface ClouderaManagerServiceDiscoveryMessages {
@Message(level = MessageLevel.DEBUG, text = "Activation event relevance: {0}
= {1} ({2} / {3} / {4} / {5})")
void activationEventRelevance(String eventId, String relevance, String
command, String status, String serviceType, boolean
serviceModelGeneratorExists);
+ @Message(level = MessageLevel.DEBUG, text = "Scale event relevance: {0} =
{1} ({2} / {3} / {4})")
+ void scaleEventRelevance(String eventId, String relevance, String eventCode,
String serviceType, boolean serviceModelGeneratorExists);
+
@Message(level = MessageLevel.DEBUG, text = "Activation event - {0} - has
already been processed, skipping ...")
void activationEventAlreadyProcessed(String eventId);
@@ -264,4 +269,10 @@ public interface ClouderaManagerServiceDiscoveryMessages {
@Message(level = MessageLevel.WARN, text = "The configured maximum retry
attempts of {0} may overlap with the configured polling interval settings;
using {1} retry attempts")
void updateMaxRetryAttempts(int configured, int actual);
+
+ @Message(level = MessageLevel.DEBUG, text = "Found upscale event for role:
{0} hosts: {1}")
+ void foundUpScaleEvent(String role, Set<String> hosts);
+
+ @Message(level = MessageLevel.DEBUG, text = "Found downscale event for role:
{0} hosts: {1}")
+ void foundDownScaleEvent(String role, Set<String> hosts);
}
diff --git
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index 1c0374d81..c4e511d36 100644
---
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -61,6 +61,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -91,10 +92,16 @@ public class PollingConfigurationAnalyzer implements
Runnable {
static final String CM_SERVICE_TYPE = "ManagerServer";
static final String CM_SERVICE = "ClouderaManager";
+ public static final String EVENT_CODE_ROLE_DELETED = "EV_ROLE_DELETED";
+ public static final String EVENT_CODE_ROLE_CREATED = "EV_ROLE_CREATED";
+
// Collection of those commands which represent the potential activation of
service configuration changes
- private static final Collection<String> ACTIVATION_COMMANDS =
Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND,
+ private static final Collection<String> START_COMMANDS =
Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND,
RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND);
+ private static final Collection<String> DELETED_EVENT_CODES =
Arrays.asList(EVENT_CODE_ROLE_DELETED);
+ private static final Collection<String> CREATED_EVENT_CODES =
Arrays.asList(EVENT_CODE_ROLE_CREATED);
+
// The format of the filter employed when start events are queried from
ClouderaManager
private static final String EVENTS_QUERY_FORMAT =
"category==" +
ApiEventCategory.AUDIT_EVENT.getValue() +
@@ -222,14 +229,16 @@ public class PollingConfigurationAnalyzer implements
Runnable {
// Configuration changes don't mean anything without corresponding
service start/restarts. Therefore, monitor
// start events, and check the configuration only of the restarted
service(s) to identify changes
// that should trigger re-discovery.
- final List<StartEvent> relevantEvents = getRelevantEvents(address,
clusterName);
+ final List<RelevantEvent> relevantEvents =
getRelevantEvents(address, clusterName);
// If there are no recent start events, then nothing to do now
if (!relevantEvents.isEmpty()) {
// If a change has occurred, notify the listeners
- if (hasConfigChanged(address, clusterName, relevantEvents)) {
+ if (hasConfigChanged(address, clusterName, relevantEvents) ||
hasScaleEvent(relevantEvents)) {
notifyChangeListener(address, clusterName);
}
+ // these events should not be processed again even if the next
CM query result contains them
+ relevantEvents.forEach(re ->
processedEvents.put(re.auditEvent.getId(), 1L));
}
}
}
@@ -250,7 +259,34 @@ public class PollingConfigurationAnalyzer implements
Runnable {
log.stoppedClouderaManagerConfigMonitor();
}
- private boolean hasConfigChanged(String address, String clusterName,
List<StartEvent> relevantEvents) {
+ private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) {
+ boolean found = false;
+ for (RelevantEvent event: relevantEvents) {
+ if (alreadyProcessed(event)) {
+ log.activationEventAlreadyProcessed(event.auditEvent.getId());
+ continue;
+ }
+ if (event.getRole() != null) {
+ if (event.isRoleAddedEvent()) {
+ log.foundUpScaleEvent(event.getRole(), event.getHosts());
+ found = true;
+ break;
+ }
+ if (event.isRoleDeletedEvent()) {
+ log.foundDownScaleEvent(event.getRole(), event.getHosts());
+ found = true;
+ break;
+ }
+ }
+ }
+ return found;
+ }
+
+ private boolean alreadyProcessed(RelevantEvent event) {
+ return processedEvents.getIfPresent(event.auditEvent.getId()) != null;
+ }
+
+ private boolean hasConfigChanged(String address, String clusterName,
List<RelevantEvent> relevantEvents) {
// If there are start events, then check the previously-recorded
properties for the same service to
// identify if the configuration has changed
final Map<String, ServiceConfigurationModel> serviceConfigurations =
@@ -260,12 +296,16 @@ public class PollingConfigurationAnalyzer implements
Runnable {
final List<String> handledServiceTypes = new ArrayList<>();
boolean configHasChanged = false;
- for (StartEvent re : relevantEvents) {
- if (processedEvents.getIfPresent(re.auditEvent.getId()) != null) {
+ for (RelevantEvent re : relevantEvents) {
+ if (alreadyProcessed(re)) {
log.activationEventAlreadyProcessed(re.auditEvent.getId());
continue;
}
+ if (re.isRoleAddedEvent() || re.isRoleDeletedEvent()) {
+ continue;
+ }
+
String serviceType = re.getServiceType();
if (CM_SERVICE_TYPE.equals(serviceType)) {
@@ -309,9 +349,6 @@ public class PollingConfigurationAnalyzer implements
Runnable {
}
}
- // these events should not be processed again even if the next CM query
result contains them
- relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(),
1L));
-
return configHasChanged;
}
@@ -420,8 +457,8 @@ public class PollingConfigurationAnalyzer implements
Runnable {
*
* @return A List of StartEvent objects for service start events since the
last time they were queried.
*/
- private List<StartEvent> getRelevantEvents(final String address, final
String clusterName) {
- List<StartEvent> relevantEvents = new ArrayList<>();
+ private List<RelevantEvent> getRelevantEvents(final String address, final
String clusterName) {
+ List<RelevantEvent> relevantEvents = new ArrayList<>();
// Get the last event query timestamp
Instant lastTimestamp = getEventQueryTimestamp(address, clusterName);
@@ -446,8 +483,8 @@ public class PollingConfigurationAnalyzer implements
Runnable {
log.noActivationEventFound();
} else {
for (ApiEvent event : events) {
- if(isRelevantEvent(event)) {
- relevantEvents.add(new StartEvent(event));
+ if(isStartEvent(event) || isScaleEvent(event)) {
+ relevantEvents.add(new RelevantEvent(event));
}
}
}
@@ -456,17 +493,28 @@ public class PollingConfigurationAnalyzer implements
Runnable {
}
@SuppressWarnings("unchecked")
- private boolean isRelevantEvent(ApiEvent event) {
+ private boolean isStartEvent(ApiEvent event) {
final Map<String, Object> attributeMap =
getAttributeMap(event.getAttributes());
final String command = getAttribute(attributeMap, COMMAND);
final String status = getAttribute(attributeMap, COMMAND_STATUS);
- final String serviceType = getAttribute(attributeMap,
StartEvent.ATTR_SERVICE_TYPE);
+ final String serviceType = getAttribute(attributeMap,
RelevantEvent.ATTR_SERVICE_TYPE);
final boolean serviceModelGeneratorExists =
serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null;
- final boolean relevant = ACTIVATION_COMMANDS.contains(command) &&
SUCCEEDED_STATUS.equals(status) && serviceModelGeneratorExists;
+ final boolean relevant = START_COMMANDS.contains(command) &&
SUCCEEDED_STATUS.equals(status) && serviceModelGeneratorExists;
log.activationEventRelevance(event.getId(), String.valueOf(relevant),
command, status, serviceType, serviceModelGeneratorExists);
return relevant;
}
+ private boolean isScaleEvent(ApiEvent event) {
+ final Map<String, Object> attributeMap =
getAttributeMap(event.getAttributes());
+ final String serviceType = getAttribute(attributeMap,
RelevantEvent.ATTR_SERVICE_TYPE);
+ final String eventCode = getAttribute(attributeMap,
RelevantEvent.ATTR_EVENT_CODE);
+ final boolean serviceModelGeneratorExists =
serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null;
+ final boolean relevant = serviceModelGeneratorExists &&
+ (CREATED_EVENT_CODES.contains(eventCode) ||
DELETED_EVENT_CODES.contains(eventCode));
+ log.scaleEventRelevance(event.getId(), String.valueOf(relevant),
eventCode, serviceType, relevant);
+ return relevant;
+ }
+
@SuppressWarnings("unchecked")
private String getAttribute( Map<String, Object> attributeMap, String
attributeName) {
return attributeMap.containsKey(attributeName) ? ((List<String>)
attributeMap.get(attributeName)).get(0) : "";
@@ -605,11 +653,14 @@ public class PollingConfigurationAnalyzer implements
Runnable {
/**
* Internal representation of a ClouderaManager service start event
*/
- static final class StartEvent {
+ static final class RelevantEvent {
private static final String ATTR_CLUSTER = "CLUSTER";
private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
private static final String ATTR_SERVICE = "SERVICE";
+ private static final String ATTR_ROLE = "ROLE_TYPE";
+ private static final String ATTR_HOST = "HOSTS";
+ private static final String ATTR_EVENT_CODE = "EVENTCODE";
private static List<String> attrsOfInterest = new ArrayList<>();
@@ -617,14 +668,20 @@ public class PollingConfigurationAnalyzer implements
Runnable {
attrsOfInterest.add(ATTR_CLUSTER);
attrsOfInterest.add(ATTR_SERVICE_TYPE);
attrsOfInterest.add(ATTR_SERVICE);
+ attrsOfInterest.add(ATTR_ROLE);
+ attrsOfInterest.add(ATTR_HOST);
+ attrsOfInterest.add(ATTR_EVENT_CODE);
}
private ApiEvent auditEvent;
private String clusterName;
private String serviceType;
private String service;
+ private String role;
+ private String eventCode;
+ private Set<String> hosts = new HashSet<>();
- StartEvent(final ApiEvent auditEvent) {
+ RelevantEvent(final ApiEvent auditEvent) {
if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) {
throw new IllegalArgumentException("Invalid event category " +
auditEvent.getCategory().getValue());
}
@@ -652,6 +709,22 @@ public class PollingConfigurationAnalyzer implements
Runnable {
return service;
}
+ Set<String> getHosts() {
+ return hosts;
+ }
+
+ String getRole() {
+ return role;
+ }
+
+ boolean isRoleAddedEvent() {
+ return EVENT_CODE_ROLE_CREATED.equals(eventCode);
+ }
+
+ boolean isRoleDeletedEvent() {
+ return EVENT_CODE_ROLE_DELETED.equals(eventCode);
+ }
+
private void setPropertyFromAttribute(final ApiEventAttribute attribute) {
switch (attribute.getName()) {
case ATTR_CLUSTER:
@@ -663,9 +736,19 @@ public class PollingConfigurationAnalyzer implements
Runnable {
case ATTR_SERVICE:
service = attribute.getValues().get(0);
break;
+ case ATTR_HOST:
+ if (attribute.getValues() != null &&
!attribute.getValues().isEmpty()) {
+ hosts.addAll(attribute.getValues());
+ }
+ break;
+ case ATTR_ROLE:
+ role = attribute.getValues().get(0);
+ break;
+ case ATTR_EVENT_CODE:
+ eventCode = attribute.getValues().get(0);
+ break;
default:
}
}
}
-
}
diff --git
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index 33f5e8f93..d6dc186f4 100644
---
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -381,6 +381,38 @@ public class PollingConfigurationAnalyzerTest {
}
}
+ @Test
+ public void testNotificationSentAfterDownScaleEvent() {
+ final String clusterName = "Cluster T";
+
+ final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
+ revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+ revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE",
HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+ revisionEventAttrs.add(createEventAttribute("SERVICE",
HiveOnTezServiceModelGenerator.SERVICE));
+ revisionEventAttrs.add(createEventAttribute("ROLE_TYPE",
HiveOnTezServiceModelGenerator.ROLE_TYPE));
+ revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
+ revisionEventAttrs.add(createEventAttribute("EVENTCODE",
PollingConfigurationAnalyzer.EVENT_CODE_ROLE_DELETED));
+ final ApiEvent revisionEvent =
createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null);
+
+ doTestEventWithConfigChange(revisionEvent, clusterName);
+ }
+
+ @Test
+ public void testNotificationSentAfterUpScaleEvent() {
+ final String clusterName = "Cluster T";
+
+ final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
+ revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+ revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE",
HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+ revisionEventAttrs.add(createEventAttribute("SERVICE",
HiveOnTezServiceModelGenerator.SERVICE));
+ revisionEventAttrs.add(createEventAttribute("ROLE_TYPE",
HiveOnTezServiceModelGenerator.ROLE_TYPE));
+ revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
+ revisionEventAttrs.add(createEventAttribute("EVENTCODE",
PollingConfigurationAnalyzer.EVENT_CODE_ROLE_CREATED));
+ final ApiEvent revisionEvent =
createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null);
+
+ doTestEventWithConfigChange(revisionEvent, clusterName);
+ }
+
private void doTestStartEvent(final ApiEventCategory category) {
final String clusterName = "My Cluster";
final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
@@ -392,7 +424,7 @@ public class PollingConfigurationAnalyzerTest {
apiEventAttrs.add(createEventAttribute("SERVICE", service));
ApiEvent apiEvent = createApiEvent(category, apiEventAttrs, null);
- PollingConfigurationAnalyzer.StartEvent restartEvent = new
PollingConfigurationAnalyzer.StartEvent(apiEvent);
+ PollingConfigurationAnalyzer.RelevantEvent restartEvent = new
PollingConfigurationAnalyzer.RelevantEvent(apiEvent);
assertNotNull(restartEvent);
assertEquals(clusterName, restartEvent.getClusterName());
assertEquals(serviceType, restartEvent.getServiceType());