This is an automated email from the ASF dual-hosted git repository.

amagyar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 1da5edc9f KNOX-2959 - Auto discovery to support scaling scenarios 
(#796)
1da5edc9f is described below

commit 1da5edc9f044a83262f485e2cdb767384a92038c
Author: Attila Magyar <[email protected]>
AuthorDate: Wed Oct 4 13:14:42 2023 +0200

    KNOX-2959 - Auto discovery to support scaling scenarios (#796)
---
 .../ClouderaManagerServiceDiscoveryMessages.java   |  11 ++
 .../cm/monitor/PollingConfigurationAnalyzer.java   | 121 +++++++++++++++++----
 .../monitor/PollingConfigurationAnalyzerTest.java  |  34 +++++-
 3 files changed, 146 insertions(+), 20 deletions(-)

diff --git 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 4c1842fa2..ca331241d 100644
--- 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -16,6 +16,8 @@
  */
 package org.apache.knox.gateway.topology.discovery.cm;
 
+import java.util.Set;
+
 import com.cloudera.api.swagger.client.ApiException;
 import org.apache.knox.gateway.i18n.messages.Message;
 import org.apache.knox.gateway.i18n.messages.MessageLevel;
@@ -199,6 +201,9 @@ public interface ClouderaManagerServiceDiscoveryMessages {
   @Message(level = MessageLevel.DEBUG, text = "Activation event relevance: {0} 
= {1} ({2} / {3} / {4} / {5})")
   void activationEventRelevance(String eventId, String relevance, String 
command, String status, String serviceType, boolean 
serviceModelGeneratorExists);
 
+  @Message(level = MessageLevel.DEBUG, text = "Scale event relevance: {0} = 
{1} ({2} / {3} / {4})")
+  void scaleEventRelevance(String eventId, String relevance, String eventCode, 
String serviceType, boolean serviceModelGeneratorExists);
+
   @Message(level = MessageLevel.DEBUG, text = "Activation event - {0} - has 
already been processed, skipping ...")
   void activationEventAlreadyProcessed(String eventId);
 
@@ -264,4 +269,10 @@ public interface ClouderaManagerServiceDiscoveryMessages {
 
   @Message(level = MessageLevel.WARN, text = "The configured maximum retry 
attempts of {0} may overlap with the configured polling interval settings; 
using {1} retry attempts")
   void updateMaxRetryAttempts(int configured, int actual);
+
+  @Message(level = MessageLevel.DEBUG, text = "Found upscale event for role: 
{0} hosts: {1}")
+  void foundUpScaleEvent(String role, Set<String> hosts);
+
+  @Message(level = MessageLevel.DEBUG, text = "Found downscale event for role: 
{0} hosts: {1}")
+  void foundDownScaleEvent(String role, Set<String> hosts);
 }
diff --git 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index 1c0374d81..c4e511d36 100644
--- 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -61,6 +61,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -91,10 +92,16 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
   static final String CM_SERVICE_TYPE = "ManagerServer";
   static final String CM_SERVICE      = "ClouderaManager";
 
+  public static final String EVENT_CODE_ROLE_DELETED = "EV_ROLE_DELETED";
+  public static final String EVENT_CODE_ROLE_CREATED = "EV_ROLE_CREATED";
+
   // Collection of those commands which represent the potential activation of 
service configuration changes
-  private static final Collection<String> ACTIVATION_COMMANDS = 
Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND,
+  private static final Collection<String> START_COMMANDS = 
Arrays.asList(START_COMMAND, RESTART_COMMAND, ROLLING_RESTART_COMMAND,
       RESTART_WAITING_FOR_STALENESS_SUCCESS_COMMAND);
 
+  private static final Collection<String> DELETED_EVENT_CODES = 
Arrays.asList(EVENT_CODE_ROLE_DELETED);
+  private static final Collection<String> CREATED_EVENT_CODES = 
Arrays.asList(EVENT_CODE_ROLE_CREATED);
+
   // The format of the filter employed when start events are queried from 
ClouderaManager
   private static final String EVENTS_QUERY_FORMAT =
                                 "category==" + 
ApiEventCategory.AUDIT_EVENT.getValue() +
@@ -222,14 +229,16 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
             // Configuration changes don't mean anything without corresponding 
service start/restarts. Therefore, monitor
             // start events, and check the configuration only of the restarted 
service(s) to identify changes
             // that should trigger re-discovery.
-            final List<StartEvent> relevantEvents = getRelevantEvents(address, 
clusterName);
+            final List<RelevantEvent> relevantEvents = 
getRelevantEvents(address, clusterName);
 
             // If there are no recent start events, then nothing to do now
             if (!relevantEvents.isEmpty()) {
               // If a change has occurred, notify the listeners
-              if (hasConfigChanged(address, clusterName, relevantEvents)) {
+              if (hasConfigChanged(address, clusterName, relevantEvents) || 
hasScaleEvent(relevantEvents)) {
                 notifyChangeListener(address, clusterName);
               }
+              // these events should not be processed again even if the next 
CM query result contains them
+              relevantEvents.forEach(re -> 
processedEvents.put(re.auditEvent.getId(), 1L));
             }
           }
         }
@@ -250,7 +259,34 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
     log.stoppedClouderaManagerConfigMonitor();
   }
 
-  private boolean hasConfigChanged(String address, String clusterName, 
List<StartEvent> relevantEvents) {
+  private boolean hasScaleEvent(List<RelevantEvent> relevantEvents) {
+    boolean found = false;
+    for (RelevantEvent event: relevantEvents) {
+      if (alreadyProcessed(event)) {
+        log.activationEventAlreadyProcessed(event.auditEvent.getId());
+        continue;
+      }
+      if (event.getRole() != null) {
+        if (event.isRoleAddedEvent()) {
+          log.foundUpScaleEvent(event.getRole(), event.getHosts());
+          found = true;
+          break;
+        }
+        if (event.isRoleDeletedEvent()) {
+          log.foundDownScaleEvent(event.getRole(), event.getHosts());
+          found = true;
+          break;
+        }
+      }
+    }
+    return found;
+  }
+
+  private boolean alreadyProcessed(RelevantEvent event) {
+    return processedEvents.getIfPresent(event.auditEvent.getId()) != null;
+  }
+
+  private boolean hasConfigChanged(String address, String clusterName, 
List<RelevantEvent> relevantEvents) {
     // If there are start events, then check the previously-recorded 
properties for the same service to
     // identify if the configuration has changed
     final Map<String, ServiceConfigurationModel> serviceConfigurations =
@@ -260,12 +296,16 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
     final List<String> handledServiceTypes = new ArrayList<>();
 
     boolean configHasChanged = false;
-    for (StartEvent re : relevantEvents) {
-      if (processedEvents.getIfPresent(re.auditEvent.getId()) != null) {
+    for (RelevantEvent re : relevantEvents) {
+      if (alreadyProcessed(re)) {
         log.activationEventAlreadyProcessed(re.auditEvent.getId());
         continue;
       }
 
+      if (re.isRoleAddedEvent() || re.isRoleDeletedEvent()) {
+        continue;
+      }
+
       String serviceType = re.getServiceType();
 
       if (CM_SERVICE_TYPE.equals(serviceType)) {
@@ -309,9 +349,6 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
       }
     }
 
-    // these events should not be processed again even if the next CM query 
result contains them
-    relevantEvents.forEach(re -> processedEvents.put(re.auditEvent.getId(), 
1L));
-
     return configHasChanged;
   }
 
@@ -420,8 +457,8 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
    *
    * @return A List of StartEvent objects for service start events since the 
last time they were queried.
    */
-  private List<StartEvent> getRelevantEvents(final String address, final 
String clusterName) {
-    List<StartEvent> relevantEvents = new ArrayList<>();
+  private List<RelevantEvent> getRelevantEvents(final String address, final 
String clusterName) {
+    List<RelevantEvent> relevantEvents = new ArrayList<>();
 
     // Get the last event query timestamp
     Instant lastTimestamp = getEventQueryTimestamp(address, clusterName);
@@ -446,8 +483,8 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
       log.noActivationEventFound();
     } else {
       for (ApiEvent event : events) {
-        if(isRelevantEvent(event)) {
-          relevantEvents.add(new StartEvent(event));
+        if(isStartEvent(event) || isScaleEvent(event)) {
+          relevantEvents.add(new RelevantEvent(event));
         }
       }
     }
@@ -456,17 +493,28 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
   }
 
   @SuppressWarnings("unchecked")
-  private boolean isRelevantEvent(ApiEvent event) {
+  private boolean isStartEvent(ApiEvent event) {
     final Map<String, Object> attributeMap = 
getAttributeMap(event.getAttributes());
     final String command = getAttribute(attributeMap, COMMAND);
     final String status = getAttribute(attributeMap, COMMAND_STATUS);
-    final String serviceType = getAttribute(attributeMap, 
StartEvent.ATTR_SERVICE_TYPE);
+    final String serviceType = getAttribute(attributeMap, 
RelevantEvent.ATTR_SERVICE_TYPE);
     final boolean serviceModelGeneratorExists = 
serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null;
-    final boolean relevant = ACTIVATION_COMMANDS.contains(command) && 
SUCCEEDED_STATUS.equals(status) && serviceModelGeneratorExists;
+    final boolean relevant = START_COMMANDS.contains(command) && 
SUCCEEDED_STATUS.equals(status) && serviceModelGeneratorExists;
     log.activationEventRelevance(event.getId(), String.valueOf(relevant), 
command, status, serviceType, serviceModelGeneratorExists);
     return relevant;
   }
 
+  private boolean isScaleEvent(ApiEvent event) {
+    final Map<String, Object> attributeMap = 
getAttributeMap(event.getAttributes());
+    final String serviceType = getAttribute(attributeMap, 
RelevantEvent.ATTR_SERVICE_TYPE);
+    final String eventCode = getAttribute(attributeMap, 
RelevantEvent.ATTR_EVENT_CODE);
+    final boolean serviceModelGeneratorExists = 
serviceModelGeneratorsHolder.getServiceModelGenerators(serviceType) != null;
+    final boolean relevant = serviceModelGeneratorExists &&
+            (CREATED_EVENT_CODES.contains(eventCode) || 
DELETED_EVENT_CODES.contains(eventCode));
+    log.scaleEventRelevance(event.getId(), String.valueOf(relevant), 
eventCode, serviceType, relevant);
+    return relevant;
+  }
+
   @SuppressWarnings("unchecked")
   private String getAttribute( Map<String, Object> attributeMap, String 
attributeName) {
     return attributeMap.containsKey(attributeName) ? ((List<String>) 
attributeMap.get(attributeName)).get(0) : "";
@@ -605,11 +653,14 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
   /**
    * Internal representation of a ClouderaManager service start event
    */
-  static final class StartEvent {
+  static final class RelevantEvent {
 
     private static final String ATTR_CLUSTER      = "CLUSTER";
     private static final String ATTR_SERVICE_TYPE = "SERVICE_TYPE";
     private static final String ATTR_SERVICE      = "SERVICE";
+    private static final String ATTR_ROLE         = "ROLE_TYPE";
+    private static final String ATTR_HOST         = "HOSTS";
+    private static final String ATTR_EVENT_CODE   = "EVENTCODE";
 
     private static List<String> attrsOfInterest = new ArrayList<>();
 
@@ -617,14 +668,20 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
       attrsOfInterest.add(ATTR_CLUSTER);
       attrsOfInterest.add(ATTR_SERVICE_TYPE);
       attrsOfInterest.add(ATTR_SERVICE);
+      attrsOfInterest.add(ATTR_ROLE);
+      attrsOfInterest.add(ATTR_HOST);
+      attrsOfInterest.add(ATTR_EVENT_CODE);
     }
 
     private ApiEvent auditEvent;
     private String clusterName;
     private String serviceType;
     private String service;
+    private String role;
+    private String eventCode;
+    private Set<String> hosts = new HashSet<>();
 
-    StartEvent(final ApiEvent auditEvent) {
+    RelevantEvent(final ApiEvent auditEvent) {
       if (ApiEventCategory.AUDIT_EVENT != auditEvent.getCategory()) {
         throw new IllegalArgumentException("Invalid event category " + 
auditEvent.getCategory().getValue());
       }
@@ -652,6 +709,22 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
       return service;
     }
 
+    Set<String> getHosts() {
+      return hosts;
+    }
+
+    String getRole() {
+      return role;
+    }
+
+    boolean isRoleAddedEvent() {
+      return EVENT_CODE_ROLE_CREATED.equals(eventCode);
+    }
+
+    boolean isRoleDeletedEvent() {
+      return EVENT_CODE_ROLE_DELETED.equals(eventCode);
+    }
+
     private void setPropertyFromAttribute(final ApiEventAttribute attribute) {
       switch (attribute.getName()) {
         case ATTR_CLUSTER:
@@ -663,9 +736,19 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
         case ATTR_SERVICE:
           service = attribute.getValues().get(0);
           break;
+        case ATTR_HOST:
+          if (attribute.getValues() != null && 
!attribute.getValues().isEmpty()) {
+            hosts.addAll(attribute.getValues());
+          }
+          break;
+        case ATTR_ROLE:
+          role = attribute.getValues().get(0);
+          break;
+        case ATTR_EVENT_CODE:
+          eventCode = attribute.getValues().get(0);
+          break;
         default:
       }
     }
   }
-
 }
diff --git 
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
 
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index 33f5e8f93..d6dc186f4 100644
--- 
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ 
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -381,6 +381,38 @@ public class PollingConfigurationAnalyzerTest {
     }
   }
 
+  @Test
+  public void testNotificationSentAfterDownScaleEvent() {
+    final String clusterName = "Cluster T";
+
+    final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
+    revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+    revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", 
HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+    revisionEventAttrs.add(createEventAttribute("SERVICE", 
HiveOnTezServiceModelGenerator.SERVICE));
+    revisionEventAttrs.add(createEventAttribute("ROLE_TYPE", 
HiveOnTezServiceModelGenerator.ROLE_TYPE));
+    revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
+    revisionEventAttrs.add(createEventAttribute("EVENTCODE", 
PollingConfigurationAnalyzer.EVENT_CODE_ROLE_DELETED));
+    final ApiEvent revisionEvent = 
createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null);
+
+    doTestEventWithConfigChange(revisionEvent, clusterName);
+  }
+
+  @Test
+  public void testNotificationSentAfterUpScaleEvent() {
+    final String clusterName = "Cluster T";
+
+    final List<ApiEventAttribute> revisionEventAttrs = new ArrayList<>();
+    revisionEventAttrs.add(createEventAttribute("CLUSTER", clusterName));
+    revisionEventAttrs.add(createEventAttribute("SERVICE_TYPE", 
HiveOnTezServiceModelGenerator.SERVICE_TYPE));
+    revisionEventAttrs.add(createEventAttribute("SERVICE", 
HiveOnTezServiceModelGenerator.SERVICE));
+    revisionEventAttrs.add(createEventAttribute("ROLE_TYPE", 
HiveOnTezServiceModelGenerator.ROLE_TYPE));
+    revisionEventAttrs.add(createEventAttribute("REVISION", "215"));
+    revisionEventAttrs.add(createEventAttribute("EVENTCODE", 
PollingConfigurationAnalyzer.EVENT_CODE_ROLE_CREATED));
+    final ApiEvent revisionEvent = 
createApiEvent(ApiEventCategory.AUDIT_EVENT, revisionEventAttrs, null);
+
+    doTestEventWithConfigChange(revisionEvent, clusterName);
+  }
+
   private void doTestStartEvent(final ApiEventCategory category) {
     final String clusterName = "My Cluster";
     final String serviceType = NameNodeServiceModelGenerator.SERVICE_TYPE;
@@ -392,7 +424,7 @@ public class PollingConfigurationAnalyzerTest {
     apiEventAttrs.add(createEventAttribute("SERVICE", service));
     ApiEvent apiEvent = createApiEvent(category, apiEventAttrs, null);
 
-    PollingConfigurationAnalyzer.StartEvent restartEvent = new 
PollingConfigurationAnalyzer.StartEvent(apiEvent);
+    PollingConfigurationAnalyzer.RelevantEvent restartEvent = new 
PollingConfigurationAnalyzer.RelevantEvent(apiEvent);
     assertNotNull(restartEvent);
     assertEquals(clusterName, restartEvent.getClusterName());
     assertEquals(serviceType, restartEvent.getServiceType());

Reply via email to