This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5339332d9 Add framework and unit tests for DagActionStoreChangeMonitor 
(#3817)
5339332d9 is described below

commit 5339332d9c0d0651c7a1d3f8c07ea96a1bc6d4ad
Author: umustafi <[email protected]>
AuthorDate: Wed Nov 8 10:06:01 2023 -0800

    Add framework and unit tests for DagActionStoreChangeMonitor (#3817)
    
    * Add framework and unit tests for DagActionStoreChangeMonitor
    
    * Add more test cases and validation
    
    * Add header for new file
    
    * Move FlowSpec static function to Utils class
    
    * Remove unused import
    
    * Fix compile error
    
    * Fix unit tests
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/DagActionStoreChangeMonitorTest.java   | 237 +++++++++++++++++++++
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  18 +-
 .../apache/gobblin/runtime/api/FlowSpecTest.java   |   4 +-
 .../monitoring/DagActionStoreChangeMonitor.java    |  11 +-
 4 files changed, 254 insertions(+), 16 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
new file mode 100644
index 000000000..4025cf0db
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.net.URI;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link DagActionStoreChangeMonitor} to 
process {@link DagActionStoreChangeEvent} type
+ * events stored in a {@link 
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to 
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitorTest {
+  public static final String TOPIC = 
DagActionStoreChangeEvent.class.getSimpleName();
+  private final int PARTITION = 1;
+  private final int OFFSET = 1;
+  private final String FLOW_GROUP = "flowGroup";
+  private final String FLOW_NAME = "flowName";
+  private final String FLOW_EXECUTION_ID = "123";
+  private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
+  private int txidCounter = 0;
+
+  /**
+   * Note: The class methods are wrapped in a test specific method because the 
original methods are package protected
+   * and cannot be accessed by this class.
+   */
+  class MockDagActionStoreChangeMonitor extends DagActionStoreChangeMonitor {
+
+    public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads,
+        boolean isMultiActiveSchedulerEnabled) {
+      super(topic, config, mock(DagActionStore.class), mock(DagManager.class), 
numThreads, mock(FlowCatalog.class),
+          mock(Orchestrator.class), isMultiActiveSchedulerEnabled);
+    }
+
+    protected void processMessageForTest(DecodeableKafkaRecord record) {
+      super.processMessage(record);
+
+    }
+
+    protected void startUpForTest() {
+      super.startUp();
+    }
+  }
+
+  MockDagActionStoreChangeMonitor createMockDagActionStoreChangeMonitor() {
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    return new MockDagActionStoreChangeMonitor("dummyTopic", config, 5, true);
+  }
+
+  // Called at start of every test so the count of each method being called is 
reset to 0
+  public void setup() {
+     mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
+     mockDagActionStoreChangeMonitor.startUpForTest();
+  }
+
+  /**
+   * Ensure no NPE results from passing a HEARTBEAT type message with a null 
{@link DagActionValue} and the message is
+   * filtered out since it's a heartbeat type so no methods are called.
+   */
+  @Test
+  public void testProcessMessageWithHeartbeatAndNullDagAction() throws 
SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", 
null);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    // Note: Indirectly verifies submitFlowToDagManagerHelper is called which 
is not a mocked object so cannot be asserted
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Ensure a HEARTBEAT type message with non-empty flow information is 
filtered out since it's a heartbeat type so no
+   * methods are called.
+   */
+  @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
+  public void testProcessMessageWithHeartbeatAndFlowInfo() throws 
SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Tests process message with an INSERT type message of a `launch` action
+   */
+  @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo")
+  public void testProcessMessageWithInsertLaunchType() throws 
SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(1)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Tests process message with an INSERT type message of a `resume` action. 
It re-uses the same flow information however
+   * since it is a different tid used every time it will be considered unique 
and submit a kill request.
+   */
+  @Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType")
+  public void testProcessMessageWithInsertResumeType() throws 
SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(1)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Tests process message with an INSERT type message of a `kill` action. 
Similar to `testProcessMessageWithInsertResumeType`.
+   */
+  @Test (dependsOnMethods = "testProcessMessageWithInsertResumeType")
+  public void testProcessMessageWithInsertKillType() throws 
SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(1)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Tests process message with an UPDATE type message of the 'launch' action 
above. Although processMessage does not
+   * expect this message type it should handle it gracefully
+   */
+  @Test (dependsOnMethods = "testProcessMessageWithInsertKillType")
+  public void testProcessMessageWithUpdate() throws SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Tests process message with a DELETE type message which should be ignored 
regardless of the flow information.
+   */
+  @Test (dependsOnMethods = "testProcessMessageWithUpdate")
+  public void testProcessMessageWithDelete() throws SpecNotFoundException {
+    setup();
+    Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
+        wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
+    mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleResumeFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getDagManager(), 
times(0)).handleKillFlowRequest(anyString(), anyString(), anyLong());
+    verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
+  }
+
+  /**
+   * Util to create a general DagActionStoreChange type event
+   */
+  private DagActionStoreChangeEvent 
createDagActionStoreChangeEvent(OperationType operationType,
+      String flowGroup, String flowName, String flowExecutionId, 
DagActionValue dagAction) {
+    String key = getKeyForFlow(flowGroup, flowName, flowExecutionId);
+    GenericStoreChangeEvent genericStoreChangeEvent =
+        new GenericStoreChangeEvent(key, String.valueOf(txidCounter), 
System.currentTimeMillis(), operationType);
+    txidCounter++;
+    return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup, 
flowName, flowExecutionId, dagAction);
+  }
+
+  /**
+   * Form a key for events using the flow identifiers
+   * @return a key formed by adding an '_' delimiter between the flow 
identifiers
+   */
+  private String getKeyForFlow(String flowGroup, String flowName, String 
flowExecutionId) {
+    return flowGroup + "_" + flowName + "_" + flowExecutionId;
+  }
+
+  /**
+   * Util to create wrapper around DagActionStoreChangeEvent
+   */
+  private Kafka09ConsumerClient.Kafka09ConsumerRecord 
wrapDagActionStoreChangeEvent(OperationType operationType, String flowGroup, 
String flowName,
+      String flowExecutionId, DagActionValue dagAction) {
+    DagActionStoreChangeEvent eventToProcess = null;
+    try {
+      eventToProcess =
+          createDagActionStoreChangeEvent(operationType, flowGroup, flowName, 
flowExecutionId, dagAction);
+    } catch (Exception e) {
+      log.error("Exception while creating event ", e);
+    }
+    // TODO: handle partition and offset values better
+    ConsumerRecord consumerRecord = new ConsumerRecord<>(TOPIC, PARTITION, 
OFFSET,
+        getKeyForFlow(flowGroup, flowName, flowExecutionId), eventToProcess);
+    return new Kafka09ConsumerClient.Kafka09ConsumerRecord(consumerRecord);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 2df547596..7b887f472 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -518,15 +518,15 @@ public class FlowSpec implements Configurable, Spec {
       return URI_SCHEME.length() + ":".length() // URI separator
         + URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_NAME_LENGTH 
+ URI_PATH_SEPARATOR.length() + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH;
     }
-  }
 
-  /**
-   * Create a new FlowSpec object with the added property defined by path and 
value parameters
-   * @param path key for new property
-   * @param value
-   */
-  public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, String 
path, String value) {
-    Config updatedConfig = flowSpec.getConfig().withValue(path, 
ConfigValueFactory.fromAnyRef(value));
-    return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
+    /**
+     * Create a new FlowSpec object with the added property defined by path 
and value parameters
+     * @param path key for new property
+     * @param value
+     */
+    public static FlowSpec createFlowSpecWithProperty(FlowSpec flowSpec, 
String path, String value) {
+      Config updatedConfig = flowSpec.getConfig().withValue(path, 
ConfigValueFactory.fromAnyRef(value));
+      return new Builder(flowSpec.getUri()).withConfig(updatedConfig).build();
+    }
   }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
index 793abd222..6ef5ca1cc 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/FlowSpecTest.java
@@ -26,8 +26,6 @@ import org.apache.gobblin.service.FlowId;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.apache.gobblin.runtime.api.FlowSpec.*;
-
 
 public class FlowSpecTest {
 
@@ -51,7 +49,7 @@ public class FlowSpecTest {
     properties.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
"true");
 
     FlowSpec originalFlowSpec = 
FlowSpec.builder(flowUri).withConfigAsProperties(properties).build();
-    FlowSpec updatedFlowSpec = createFlowSpecWithProperty(originalFlowSpec, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+    FlowSpec updatedFlowSpec = 
FlowSpec.Utils.createFlowSpecWithProperty(originalFlowSpec, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
 
     Properties updatedProperties = updatedFlowSpec.getConfigAsProperties();
     
Assert.assertEquals(updatedProperties.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
 flowExecutionId);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index c8d9c62b7..bbe1e47ab 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.monitoring;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -27,6 +28,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
@@ -42,8 +44,6 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 
-import static org.apache.gobblin.runtime.api.FlowSpec.*;
-
 
 /**
  * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
@@ -79,10 +79,13 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
       dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, 
TimeUnit.MINUTES).build(cacheLoader);
 
   protected DagActionStore dagActionStore;
-
+  @Getter
+  @VisibleForTesting
   protected DagManager dagManager;
   protected Orchestrator orchestrator;
   protected boolean isMultiActiveSchedulerEnabled;
+  @Getter
+  @VisibleForTesting
   protected FlowCatalog flowCatalog;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
@@ -200,7 +203,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
       spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
       // Adds flowExecutionId to config to ensure they are consistent across 
hosts
-      FlowSpec updatedSpec = createFlowSpecWithProperty(spec, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+      FlowSpec updatedSpec = FlowSpec.Utils.createFlowSpecWithProperty(spec, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
       this.orchestrator.submitFlowToDagManager(updatedSpec);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {}. Exception {}", 
flowId, e.getMessage());

Reply via email to