This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 52d916355 [GOBBLIN-1718] Define DagActionStoreMonitor to listen for 
kill/resume… (#3572)
52d916355 is described below

commit 52d91635515cb9056a616ea237f4276121c89b64
Author: umustafi <[email protected]>
AuthorDate: Mon Oct 17 14:41:14 2022 -0700

    [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume… 
(#3572)
    
    * [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume 
events
    
    * creates new monitor extending HighLevelConsumer
    * defines a DagActionStoreEvent schema used for the Kafka events consumed
    
    * refactor monitors to remove shared code
    
    * fixed small errors regarding throwing errors versus emitting log/metric
    
    * update comments
    
    * refactor to move all kafka topic handling logic to consumer client
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../src/main/avro/DagActionStoreChangeEvent.avsc   |  28 ++++
 ...angeEvent.avsc => GenericStoreChangeEvent.avsc} |   6 +-
 .../gobblin/runtime/metrics/RuntimeMetrics.java    |   3 +
 .../modules/core/GobblinServiceGuiceModule.java    |   3 +
 .../modules/core/GobblinServiceManager.java        |   5 +
 .../service/modules/orchestration/DagManager.java  |  26 +--
 .../service/monitoring/ChangeMonitorUtils.java     |  50 ++++++
 .../monitoring/DagActionStoreChangeMonitor.java    | 176 +++++++++++++++++++++
 .../DagActionStoreChangeMonitorFactory.java        |  63 ++++++++
 .../service/monitoring/SpecStoreChangeMonitor.java |  72 +++------
 .../monitoring/SpecStoreChangeMonitorFactory.java  |   3 +-
 11 files changed, 369 insertions(+), 66 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
new file mode 100644
index 000000000..268f18ad0
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -0,0 +1,28 @@
+{
+  "type" : "record",
+  "name" : "DagActionStoreChangeEvent",
+  "namespace" : "org.apache.gobblin.service.monitoring",
+  "doc" : "Contains information to identify a change that occurred in the 
DagActionStore which contains pending kill or resume requests",
+  "fields" : [ {
+    "name" : "changeEventIdentifier",
+    "type" : "GenericStoreChangeEvent",
+    "doc" : "properties common to any store's change event",
+    "compliance" : "NONE"
+  },{
+    "name" : "flowGroup",
+    "type" : "string",
+    "doc" : "flow group for the dag action",
+    "compliance" : "NONE"
+  }, {
+    "name" : "flowName",
+    "type" : "string",
+    "doc" : "flow name for the dag action",
+    "compliance" : "NONE"
+  }, {
+    "name" : "flowExecutionId",
+    "type" : "string",
+    "doc" : "flow execution id for the dag action",
+    "compliance" : "NONE"
+  }
+  ]
+}
\ No newline at end of file
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/SpecStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
similarity index 83%
rename from 
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/SpecStoreChangeEvent.avsc
rename to 
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
index a5df13697..c2298f460 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/SpecStoreChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
@@ -1,12 +1,12 @@
 {
   "type" : "record",
-  "name" : "SpecStoreChangeEvent",
+  "name" : "GenericStoreChangeEvent",
   "namespace" : "org.apache.gobblin.service.monitoring",
-  "doc" : "Contains information to identify a change that occurred in the 
FlowSpecStore",
+  "doc" : "Contains information to identify a change that occurred in any 
store producing change events",
   "fields" : [ {
     "name" : "key",
     "type" : "string",
-    "doc" : "Primary key for the FlowSpecStore (spec_uri)",
+    "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
     "name" : "timestamp",
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index d714468e4..c63419d54 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -40,6 +40,9 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.failed.added.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.deleted.specs";
   public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.unexpected.errors";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.kills.invoked";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.resumes.invoked";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.unexpected.errors";
 
   // Metadata keys
   public static final String TOPIC = "topic";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index a543e6641..16ef015a8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -25,6 +25,8 @@ import 
org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 //import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
@@ -237,6 +239,7 @@ public class GobblinServiceGuiceModule implements Module {
 
     if (serviceConfig.isWarmStandbyEnabled()) {
       
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
+      
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
     }
 
     binder.bind(GobblinServiceManager.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index c07df5087..8845410a7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
 import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -215,6 +216,9 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   @Inject(optional = true)
   protected SpecStoreChangeMonitor specStoreChangeMonitor;
 
+  @Inject(optional = true)
+  protected DagActionStoreChangeMonitor dagActionStoreChangeMonitor;
+
   @Inject
   protected GobblinServiceManager(GobblinServiceConfiguration configuration) 
throws Exception {
     this.configuration = Objects.requireNonNull(configuration);
@@ -384,6 +388,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
     if (this.configuration.isWarmStandbyEnabled()) {
       this.serviceLauncher.addService(specStoreChangeMonitor);
+      this.serviceLauncher.addService(dagActionStoreChangeMonitor);
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 321f93b9a..4391a5f9b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -328,30 +328,38 @@ public class DagManager extends AbstractIdleService {
 
   @Subscribe
   public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
+    handleKillFlowRequest(killFlowEvent.getFlowGroup(), 
killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+  }
+
+  // Method used to handle kill flow requests received from subscriber-event 
model or from direct invocation
+  public void handleKillFlowRequest(String flowGroup, String flowName, long 
flowExecutionId) {
     if (isActive) {
-      log.info("Received kill request for flow ({}, {}, {})", 
killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(),
-          killFlowEvent.getFlowExecutionId());
+      log.info("Received kill request for flow ({}, {}, {})", flowGroup, 
flowName, flowExecutionId);
       try {
-        killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(), 
killFlowEvent.getFlowExecutionId());
+        killFlow(flowGroup, flowName, flowExecutionId);
       } catch (IOException e) {
         log.warn("Failed to kill flow", e);
       }
     }
   }
 
-  @Subscribe
-  public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
+  // Method used to handle resume flow requests received from subscriber-event 
model or from direct invocation
+  public void handleResumeFlowRequest(String flowGroup, String flowName, long 
flowExecutionId) {
     if (isActive) {
-      log.info("Received resume request for flow ({}, {}, {})", 
resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(), 
resumeFlowEvent.getFlowExecutionId());
-      DagId dagId = 
DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(), 
resumeFlowEvent.getFlowName(),
-          resumeFlowEvent.getFlowExecutionId());
-      int queueId = 
DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(), 
this.numThreads);
+      log.info("Received resume request for flow ({}, {}, {})", flowGroup, 
flowName, flowExecutionId);
+      DagId dagId = DagManagerUtils.generateDagId(flowGroup, flowName, 
flowExecutionId);
+      int queueId = DagManagerUtils.getDagQueueId(flowExecutionId, 
this.numThreads);
       if (!this.resumeQueue[queueId].offer(dagId)) {
         log.warn("Could not add dag " + dagId + " to resume queue");
       }
     }
   }
 
+  @Subscribe
+  public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
+    handleResumeFlowRequest(resumeFlowEvent.getFlowGroup(), 
resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
+  }
+
   public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) {
     this.topologySpecMap = topologySpecMap;
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
new file mode 100644
index 000000000..33934ef06
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.cache.LoadingCache;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public final class ChangeMonitorUtils {
+  private ChangeMonitorUtils() {
+    return;
+  }
+
+  /**
+   * Performs checks for duplicate messages and heartbeat operation prior to 
processing a message. Returns true if
+   * the pre-conditions above don't apply and we should proceed processing the 
change event
+   */
+  public static boolean shouldProcessMessage(String changeIdentifier, 
LoadingCache<String, String> cache,
+      String operation, String timestamp) {
+    // If we've already processed a message with this timestamp and key before 
then skip duplicate message
+    if (cache.getIfPresent(changeIdentifier) != null) {
+      log.debug("Duplicate change event with identifier {}", changeIdentifier);
+      return false;
+    }
+
+    // If event is a heartbeat type then log it and skip processing
+    if (operation.equals("HEARTBEAT")) {
+      log.debug("Received heartbeat message from time {}", timestamp);
+      return false;
+    }
+
+    return true;
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
new file mode 100644
index 000000000..52be719f1
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent} 
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests 
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitor extends HighLevelConsumer {
+  public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX = 
"dagActionChangeStore";
+
+  // Metrics
+  ContextAwareMeter killsInvoked;
+  ContextAwareMeter resumesInvoked;
+  ContextAwareMeter unexpectedErrors;
+
+  protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
+    @Override
+    public String load(String key) throws Exception {
+      return key;
+    }
+  };
+
+  protected LoadingCache<String, String>
+      dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, 
TimeUnit.MINUTES).build(cacheLoader);
+
+  @Inject
+  protected DagActionStore dagActionStore;
+
+  @Inject
+  protected DagManager dagManager;
+
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
+  public DagActionStoreChangeMonitor(String topic, Config config, int 
numThreads) {
+    // Differentiate group id for each host
+    super(topic, config.withValue(GROUP_ID_KEY,
+        ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
+        numThreads);
+  }
+
+  @Override
+  protected void assignTopicPartitions() {
+    // Expects underlying consumer to handle initializing partitions and 
offset for the topic -
+    // subscribe to all partitions from latest offset
+    return;
+  }
+
+  @Override
+  /*
+  This class is multi-threaded and this message will be called by multiple 
threads, however any given message will be
+  partitioned and processed by only one thread (and corresponding queue).
+   */
+  protected void processMessage(DecodeableKafkaRecord message) {
+    // TODO: Add metric that service is healthy and we're continuously 
processing messages.
+    String key = (String) message.getKey();
+    DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) 
message.getValue();
+
+    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String operation = 
value.getChangeEventIdentifier().getOperationType().name();
+    String flowGroup = value.getFlowGroup();
+    String flowName = value.getFlowName();
+    String flowExecutionId = value.getFlowExecutionId();
+
+    log.debug("Processing Dag Action message for flow group: {} name: {} 
executionId: {} timestamp {} operation {}",
+        flowGroup, flowName, flowExecutionId, timestamp, operation);
+
+    String changeIdentifier = timestamp + key;
+    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
dagActionsSeenCache, operation,
+        timestamp.toString())) {
+      return;
+    }
+
+    // Retrieve the Dag Action taken from MySQL table unless operation is 
DELETE
+    DagActionStore.DagActionValue dagAction = null;
+    if (!operation.equals("DELETE")) {
+      try {
+        dagAction = dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId).getDagActionValue();
+      } catch (IOException e) {
+        log.warn("Encountered IOException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, e);
+        this.unexpectedErrors.mark();
+      } catch (SpecNotFoundException e) {
+        log.warn("DagAction not found for flow group: {} name: {} executionId: 
{} Exception: {}", flowGroup, flowName,
+            flowExecutionId, e);
+        this.unexpectedErrors.mark();
+      } catch (SQLException throwables) {
+        log.warn("Encountered SQLException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, throwables);
+        throwables.printStackTrace();
+      }
+    }
+
+    // We only expert INSERT and DELETE operations done to this table. INSERTs 
correspond to resume or delete flow
+    // requests that have to be processed. DELETEs require no action.
+    try {
+      if (operation.equals("INSERT")) {
+        if (dagAction.equals(DagActionStore.DagActionValue.RESUME)) {
+          dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
+          this.resumesInvoked.mark();
+        } else if (dagAction.equals(DagActionStore.DagActionValue.KILL)) {
+          dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));
+          this.killsInvoked.mark();
+        } else {
+          log.warn("Received unsupported dagAction {}. Expected to be a KILL 
or RESUME", dagAction);
+          this.unexpectedErrors.mark();
+          return;
+        }
+      } else if (operation.equals("UPDATE")) {
+        log.warn("Received an UPDATE action to the DagActionStore when values 
in this store are never supposed to be "
+            + "updated. Flow group: {} name {} executionId {} were updated to 
action {}", flowGroup, flowName,
+            flowExecutionId, dagAction);
+        this.unexpectedErrors.mark();
+      } else if (operation.equals("DELETE")) {
+        log.debug("Deleted flow group: {} name: {} executionId {} from 
DagActionStore", flowGroup, flowName, flowExecutionId);
+      } else {
+        log.warn("Received unsupported change type of operation {}. Expected 
values to be in [INSERT, UPDATE, DELETE]",
+            operation);
+        this.unexpectedErrors.mark();
+        return;
+      }
+    } catch (Exception e) {
+      log.warn("Ran into unexpected error processing DagActionStore changes: 
{}", e);
+      this.unexpectedErrors.mark();
+    }
+
+    dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
+  }
+
+  @Override
+  protected void createMetrics() {
+    super.createMetrics();
+    this.killsInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
+    this.resumesInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
+    this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
+  }
+
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
new file mode 100644
index 000000000..dd8cfad94
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * A factory implementation that returns a {@link DagActionStoreChangeMonitor} 
instance.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionStoreChangeMonitor> {
+  static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME = 
"org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor";
+  static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
+
+  private final Config config;
+
+  @Inject
+  public DagActionStoreChangeMonitorFactory(Config config) { this.config = 
Objects.requireNonNull(config); }
+
+  private DagActionStoreChangeMonitor createDagActionStoreMonitor()
+    throws ReflectiveOperationException {
+    Config dagActionStoreChangeConfig = 
config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+    String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
+    int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
+
+    return (DagActionStoreChangeMonitor) 
GobblinConstructorUtils.invokeConstructor(
+        Class.forName(DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME), topic, 
dagActionStoreChangeConfig, numThreads);
+  }
+
+  @Override
+  public DagActionStoreChangeMonitor get() {
+    try {
+      return createDagActionStoreMonitor();
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Failed to initialize DagActionStoreMonitor 
due to ", e);
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index b9563141e..f58cb551c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -19,14 +19,8 @@ package org.apache.gobblin.service.monitoring;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
 import org.apache.commons.text.StringEscapeUtils;
 
@@ -34,7 +28,6 @@ import com.codahale.metrics.Meter;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -49,14 +42,10 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
-import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 
 
 /**
- * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent} 
schema to process Kafka messages received
+ * A Flow Spec Store change monitor that uses {@link GenericStoreChangeEvent} 
schema to process Kafka messages received
  * from the consumer service. This monitor responds to changes to flow specs 
(creations, updates, deletes) and acts as
  * a connector between the API and execution layers of GaaS.
  */
@@ -86,6 +75,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer 
{
   @Inject
   protected GobblinServiceJobScheduler scheduler;
 
+  // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
+  // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
@@ -95,26 +86,9 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
 
   @Override
   protected void assignTopicPartitions() {
-    // The consumer client will assign itself to all partitions for this topic 
and consume from its latest offset.
-    List<KafkaTopic> kafkaTopicList = 
this.getGobblinKafkaConsumerClient().getFilteredTopics(Collections.EMPTY_LIST,
-        Lists.newArrayList(Pattern.compile(this.topic)));
-
-    List<KafkaPartition> kafkaPartitions = new ArrayList();
-    for (KafkaTopic topic : kafkaTopicList) {
-      kafkaPartitions.addAll(topic.getPartitions());
-    }
-
-    Map<KafkaPartition, LongWatermark> partitionLongWatermarkMap = new 
HashMap<>();
-    for (KafkaPartition partition : kafkaPartitions) {
-      try {
-        partitionLongWatermarkMap.put(partition, new 
LongWatermark(this.getGobblinKafkaConsumerClient().getLatestOffset(partition)));
-      } catch (KafkaOffsetRetrievalFailureException e) {
-        log.warn("Failed to retrieve latest Kafka offset, consuming from 
beginning for partition {} due to {}",
-            partition, e);
-        partitionLongWatermarkMap.put(partition, new LongWatermark(0L));
-      }
-    }
-    this.getGobblinKafkaConsumerClient().assignAndSeek(kafkaPartitions, 
partitionLongWatermarkMap);
+    // Expects underlying consumer to handle initializing partitions and 
offset for the topic -
+    // subscribe to all partitions from latest offset
+    return;
   }
 
   @Override
@@ -123,22 +97,18 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
   associated with it), a given message itself will be partitioned and assigned 
to only one queue.
    */
   protected void processMessage(DecodeableKafkaRecord message) {
-    String specUri = (String) message.getKey();
-    SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+    // TODO: Add metric that service is healthy and we're continuously 
processing messages.
+    String key = (String) message.getKey();
+    GenericStoreChangeEvent value = (GenericStoreChangeEvent) 
message.getValue();
 
     Long timestamp = value.getTimestamp();
     String operation = value.getOperationType().name();
-    log.info("Processing message with specUri is {} timestamp is {} operation 
is {}", specUri, timestamp, operation);
 
-    // If we've already processed a message with this timestamp and spec uri 
before then skip duplicate message
-    String changeIdentifier = timestamp.toString() + specUri;
-    if (specChangesSeenCache.getIfPresent(changeIdentifier) != null) {
-      return;
-    }
+    log.debug("Processing message where specUri is {} timestamp is {} 
operation is {}", key, timestamp, operation);
 
-    // If event is a heartbeat type then log it and skip processing
-    if (operation == "HEARTBEAT") {
-      log.debug("Received heartbeat message from time {}", timestamp);
+    String changeIdentifier = timestamp + key;
+    if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
specChangesSeenCache, operation,
+        timestamp.toString())) {
       return;
     }
 
@@ -146,23 +116,21 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     URI specAsUri = null;
 
     try {
-      specAsUri = new URI(specUri);
+      specAsUri = new URI(key);
     } catch (URISyntaxException e) {
-      if (operation == "DELETE") {
-        log.warn("Could not create URI object for specUri {} due to error {}", 
specUri, e.getMessage());
-        this.unexpectedErrors.mark();
-        return;
-      }
+      log.warn("Could not create URI object for specUri {} due to error {}", 
key, e.getMessage());
+      this.unexpectedErrors.mark();
+      return;
     }
 
-    spec = (operation != "DELETE") ? 
this.flowCatalog.getSpecWrapper(specAsUri) : null;
+    spec = (!operation.equals("DELETE")) ? 
this.flowCatalog.getSpecWrapper(specAsUri) : null;
 
     // The monitor should continue to process messages regardless of failures 
with individual messages, instead we use
     // metrics to keep track of failure to process certain SpecStoreChange 
events
     try {
       // Call respective action for the type of change received
       AddSpecResponse response;
-      if (operation == "INSERT" || operation == "UPDATE") {
+      if (operation.equals("INSERT") || operation.equals("UPDATE")) {
         response = scheduler.onAddSpec(spec);
 
         // Null response means the dag failed to compile
@@ -175,7 +143,7 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
               + "invalidate the earlier compilation. Examine changes to locate 
error. Response is {}", spec, response);
           this.failedAddedSpecs.mark();
         }
-      } else if (operation == "DELETE") {
+      } else if (operation.equals("DELETE")) {
         scheduler.onDeleteSpec(specAsUri, FlowSpec.Builder.DEFAULT_VERSION);
         this.deletedSpecs.mark();
       } else {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index 3e873f331..901685660 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -35,7 +35,6 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Slf4j
 public class SpecStoreChangeMonitorFactory implements 
Provider<SpecStoreChangeMonitor> {
   static final String SPEC_STORE_CHANGE_MONITOR_CLASS_NAME = 
"org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor";
-  static final String SPEC_STORE_CHANGE_MONITOR_TOPIC_KEY = "topic";
   static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
 
   private final Config config;
@@ -48,7 +47,7 @@ public class SpecStoreChangeMonitorFactory implements 
Provider<SpecStoreChangeMo
   private SpecStoreChangeMonitor createSpecStoreChangeMonitor()
       throws ReflectiveOperationException {
     Config specStoreChangeConfig = 
config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
-    String topic = 
specStoreChangeConfig.getString(SPEC_STORE_CHANGE_MONITOR_TOPIC_KEY);
+    String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(specStoreChangeConfig, 
SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
     return (SpecStoreChangeMonitor) GobblinConstructorUtils.invokeConstructor(


Reply via email to