umustafi commented on code in PR #3973:
URL: https://github.com/apache/gobblin/pull/3973#discussion_r1640236504


##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -224,31 +224,41 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
         dagActionType);
 
+    handleDagAction(operation, dagAction, flowGroup, flowName, 
flowExecutionId, dagActionType);
+
+    dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
+  }
+
+  protected void handleDagAction(String operation, DagActionStore.DagAction 
dagAction, String flowGroup, String flowName,
+      long flowExecutionId, DagActionStore.DagActionType dagActionType) {
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
-      if (operation.equals("INSERT")) {
-        handleDagAction(dagAction, false);
-      } else if (operation.equals("UPDATE")) {
-        // TODO: change this warning message and process updates if for launch 
or reevaluate type
-        log.warn("Received an UPDATE action to the DagActionStore when values 
in this store are never supposed to be "
-            + "updated. Flow group: {} name {} executionId {} were updated to 
action {}", flowGroup, flowName,
-            flowExecutionId, dagActionType);
-        this.unexpectedErrors.mark();
-      } else if (operation.equals("DELETE")) {
-        log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
-      } else {
-        log.warn("Received unsupported change type of operation {}. Expected 
values to be in [INSERT, UPDATE, DELETE]",
-            operation);
-        this.unexpectedErrors.mark();
-        return;
+      switch (operation) {

Review Comment:
   nice improvement



##########
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagManagementDagActionStoreChangeMonitorTest.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.quartz.SchedulerException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import 
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManagement;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.DagActionValue;
+import 
org.apache.gobblin.service.monitoring.DagManagementDagActionStoreChangeMonitor;
+import org.apache.gobblin.service.monitoring.GenericStoreChangeEvent;
+import org.apache.gobblin.service.monitoring.OperationType;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests the main functionality of {@link 
DagManagementDagActionStoreChangeMonitor} to process {@link 
DagActionStoreChangeEvent} type
+ * events stored in a {@link 
org.apache.gobblin.kafka.client.KafkaConsumerRecord}. The
+ * processMessage(DecodeableKafkaRecord message) function should be able to 
gracefully process a variety of message
+ * types, even with undesired formats, without throwing exceptions.
+ */
+@Slf4j
+public class DagManagementDagActionStoreChangeMonitorTest {
+  public static final String TOPIC = 
DagActionStoreChangeEvent.class.getSimpleName();
+  private final int PARTITION = 1;
+  private final int OFFSET = 1;
+  private final String FLOW_GROUP = "flowGroup";
+  private final String FLOW_NAME = "flowName";
+  private final long FLOW_EXECUTION_ID = 123L;
+  private final String JOB_NAME = "jobName";
+  private MockDagManagementDagActionStoreChangeMonitor 
mockDagManagementDagActionStoreChangeMonitor;
+  private int txidCounter = 0;
+
+  private static final DagActionReminderScheduler dagActionReminderScheduler = 
mock(DagActionReminderScheduler.class);
+
+  /**
+   * Note: The class methods are wrapped in a test specific method because the 
original methods are package protected
+   * and cannot be accessed by this class.
+   */
+  static class MockDagManagementDagActionStoreChangeMonitor extends 
DagManagementDagActionStoreChangeMonitor {
+
+    public MockDagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads, boolean isMultiActiveSchedulerEnabled) {
+      super(config, numThreads, mock(FlowCatalog.class), 
mock(Orchestrator.class), mock(DagManagementStateStore.class),
+          isMultiActiveSchedulerEnabled, mock(DagManagement.class), 
dagActionReminderScheduler);
+    }
+    protected void processMessageForTest(DecodeableKafkaRecord<String, 
DagActionStoreChangeEvent> record) {
+      super.processMessage(record);
+    }
+  }
+
+  MockDagManagementDagActionStoreChangeMonitor 
createMockDagManagementDagActionStoreChangeMonitor() {
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    return new MockDagManagementDagActionStoreChangeMonitor(config, 5, true);
+  }
+
+  // Called at start of every test so the count of each method being called is 
reset to 0
+  @BeforeMethod
+  public void setupMockMonitor() {
+     mockDagManagementDagActionStoreChangeMonitor = 
createMockDagManagementDagActionStoreChangeMonitor();
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    doNothing().when(dagActionReminderScheduler).unscheduleReminderJob(any());
+
+  }
+
+  /**
+   * Tests process message with a DELETE type message which should be ignored 
regardless of the flow information.

Review Comment:
   is this desc accurate? the delete is not ignored anymore



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -37,16 +40,56 @@
 @Slf4j
 public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
   private final DagManagement dagManagement;
+  @VisibleForTesting @Getter
+  private final DagActionReminderScheduler dagActionReminderScheduler;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
       FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
-      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement, 
DagActionReminderScheduler dagActionReminderScheduler) {

Review Comment:
   do u need to make sure initialization order in guide works here? 



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java:
##########
@@ -93,25 +81,26 @@ public void resumeDag()
     
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
     
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
+    this.dagManagementStateStore.checkpointDag(dag);
+    // simulate it as a failed dag
+    this.dagManagementStateStore.markDagFailed(dag);
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),
         null, this.dagManagementStateStore));
     resumeDagProc.process(this.dagManagementStateStore);
 
-    SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
-    List<SpecProducer<Spec>> otherSpecProducers = 
dag.getNodes().stream().map(node -> {
-      try {
-        return DagManagerUtils.getSpecProducer(node);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }).filter(sp -> specProducer != sp).collect(Collectors.toList());
     int expectedNumOfResumedJobs = 1; // = number of resumed nodes
 
-    Mockito.verify(specProducer, 
Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
+    // only the current job should have run
+    // we cannot check the spec producers if addSpec is called on them, 
because in resumeDagProc, a dag is stored in DMSS
+    // and retrieved, hence it goes through serialization/deserialization; in 
this process  the SpecProducer objects in

Review Comment:
   also change to use multi line comment



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -228,12 +225,12 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
             this.eventProducer.emitObservabilityEvent(jobStatus);
           }
 
-          if (this.dagProcEngineEnabled) {
+          if (this.dagProcEngineEnabled && isJobLevelStatus(jobName)) {

Review Comment:
   do u have test for isJobLevelStatus or this monitoring method?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java:
##########
@@ -180,4 +175,29 @@ public static Dag<JobExecutionPlan> 
buildDagWithMultipleNodesAtDifferentLevels(S
     }
     return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
   }
+
+  public static void mockDMSSCommonBehavior(DagManagementStateStore 
dagManagementStateStore) throws IOException, SpecNotFoundException {
+    
doReturn(FlowSpec.builder().build()).when(dagManagementStateStore).getFlowSpec(any());
+    doNothing().when(dagManagementStateStore).tryAcquireQuota(any());
+    doReturn(true).when(dagManagementStateStore).releaseQuota(any());
+  }
+
+  public static TopologySpec buildNaiveTopologySpec(String specUriInString) {
+    String specStoreDir = "/tmp/specStoreDir";

Review Comment:
   add comments



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProcTest.java:
##########
@@ -93,25 +81,26 @@ public void resumeDag()
     
dag.getNodes().get(1).getValue().setExecutionStatus(ExecutionStatus.FAILED);
     
dag.getNodes().get(2).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
     
dag.getNodes().get(4).getValue().setExecutionStatus(ExecutionStatus.COMPLETE);
-    
doReturn(Optional.of(dag)).when(dagManagementStateStore).getFailedDag(any());
+    this.dagManagementStateStore.checkpointDag(dag);
+    // simulate it as a failed dag
+    this.dagManagementStateStore.markDagFailed(dag);
 
     ResumeDagProc resumeDagProc = new ResumeDagProc(new ResumeDagTask(new 
DagActionStore.DagAction(flowGroup, flowName,
         flowExecutionId, MysqlDagActionStore.NO_JOB_NAME_DEFAULT, 
DagActionStore.DagActionType.RESUME),
         null, this.dagManagementStateStore));
     resumeDagProc.process(this.dagManagementStateStore);
 
-    SpecProducer<Spec> specProducer = 
DagManagerUtils.getSpecProducer(dag.getNodes().get(1));
-    List<SpecProducer<Spec>> otherSpecProducers = 
dag.getNodes().stream().map(node -> {
-      try {
-        return DagManagerUtils.getSpecProducer(node);
-      } catch (ExecutionException | InterruptedException e) {
-        throw new RuntimeException(e);
-      }
-    }).filter(sp -> specProducer != sp).collect(Collectors.toList());
     int expectedNumOfResumedJobs = 1; // = number of resumed nodes
 
-    Mockito.verify(specProducer, 
Mockito.times(expectedNumOfResumedJobs)).addSpec(any());
+    // only the current job should have run
+    // we cannot check the spec producers if addSpec is called on them, 
because in resumeDagProc, a dag is stored in DMSS
+    // and retrieved, hence it goes through serialization/deserialization; in 
this process  the SpecProducer objects in

Review Comment:
   nit: extra space before "the specProducer"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -37,16 +40,56 @@
 @Slf4j
 public class DagManagementDagActionStoreChangeMonitor extends 
DagActionStoreChangeMonitor {
   private final DagManagement dagManagement;
+  @VisibleForTesting @Getter
+  private final DagActionReminderScheduler dagActionReminderScheduler;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagManagementDagActionStoreChangeMonitor(Config config, int 
numThreads,
       FlowCatalog flowCatalog, Orchestrator orchestrator, 
DagManagementStateStore dagManagementStateStore,
-      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement) {
+      boolean isMultiActiveSchedulerEnabled, DagManagement dagManagement, 
DagActionReminderScheduler dagActionReminderScheduler) {
     // DagManager is only needed in the `handleDagAction` method of its parent 
class and not needed in this class,
     // so we are passing a null value for DagManager to its parent class.
     super("", config, null, numThreads, flowCatalog, orchestrator, 
dagManagementStateStore, isMultiActiveSchedulerEnabled);
     this.dagManagement = dagManagement;
+    this.dagActionReminderScheduler = dagActionReminderScheduler;
+  }
+
+  @Override
+  protected void handleDagAction(String operation, DagActionStore.DagAction 
dagAction, String flowGroup, String flowName,
+      long flowExecutionId, DagActionStore.DagActionType dagActionType) {
+    // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
+    // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed.
+    try {
+      switch (operation) {
+        case "INSERT":
+          handleDagAction(dagAction, false);
+          break;
+        case "UPDATE":
+          // TODO: change this warning message and process updates if for 
launch or reevaluate type
+          log.warn("Received an UPDATE action to the DagActionStore when 
values in this store are never supposed to be "

Review Comment:
   this is not true any longer like ur saying so we should update this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to