This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 52d916355 [GOBBLIN-1718] Define DagActionStoreMonitor to listen for
kill/resume… (#3572)
52d916355 is described below
commit 52d91635515cb9056a616ea237f4276121c89b64
Author: umustafi <[email protected]>
AuthorDate: Mon Oct 17 14:41:14 2022 -0700
[GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume…
(#3572)
* [GOBBLIN-1718] Define DagActionStoreMonitor to listen for kill/resume
events
* creates new monitor extending HighLevelConsumer
* defines a DagActionStoreEvent schema used for the Kafka events consumed
* refactor monitors to remove shared code
* fixed small errors regarding throwing errors versus emitting log/metric
* update comments
* refactor to move all kafka topic handling logic to consumer client
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../src/main/avro/DagActionStoreChangeEvent.avsc | 28 ++++
...angeEvent.avsc => GenericStoreChangeEvent.avsc} | 6 +-
.../gobblin/runtime/metrics/RuntimeMetrics.java | 3 +
.../modules/core/GobblinServiceGuiceModule.java | 3 +
.../modules/core/GobblinServiceManager.java | 5 +
.../service/modules/orchestration/DagManager.java | 26 +--
.../service/monitoring/ChangeMonitorUtils.java | 50 ++++++
.../monitoring/DagActionStoreChangeMonitor.java | 176 +++++++++++++++++++++
.../DagActionStoreChangeMonitorFactory.java | 63 ++++++++
.../service/monitoring/SpecStoreChangeMonitor.java | 72 +++------
.../monitoring/SpecStoreChangeMonitorFactory.java | 3 +-
11 files changed, 369 insertions(+), 66 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
new file mode 100644
index 000000000..268f18ad0
--- /dev/null
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -0,0 +1,28 @@
+{
+ "type" : "record",
+ "name" : "DagActionStoreChangeEvent",
+ "namespace" : "org.apache.gobblin.service.monitoring",
+ "doc" : "Contains information to identify a change that occurred in the
DagActionStore which contains pending kill or resume requests",
+ "fields" : [ {
+ "name" : "changeEventIdentifier",
+ "type" : "GenericStoreChangeEvent",
+ "doc" : "properties common to any store's change event",
+ "compliance" : "NONE"
+ },{
+ "name" : "flowGroup",
+ "type" : "string",
+ "doc" : "flow group for the dag action",
+ "compliance" : "NONE"
+ }, {
+ "name" : "flowName",
+ "type" : "string",
+ "doc" : "flow name for the dag action",
+ "compliance" : "NONE"
+ }, {
+ "name" : "flowExecutionId",
+ "type" : "string",
+ "doc" : "flow execution id for the dag action",
+ "compliance" : "NONE"
+ }
+ ]
+}
\ No newline at end of file
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/SpecStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
similarity index 83%
rename from
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/SpecStoreChangeEvent.avsc
rename to
gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
index a5df13697..c2298f460 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/SpecStoreChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
@@ -1,12 +1,12 @@
{
"type" : "record",
- "name" : "SpecStoreChangeEvent",
+ "name" : "GenericStoreChangeEvent",
"namespace" : "org.apache.gobblin.service.monitoring",
- "doc" : "Contains information to identify a change that occurred in the
FlowSpecStore",
+ "doc" : "Contains information to identify a change that occurred in any
store producing change events",
"fields" : [ {
"name" : "key",
"type" : "string",
- "doc" : "Primary key for the FlowSpecStore (spec_uri)",
+ "doc" : "Primary key for the store",
"compliance" : "NONE"
}, {
"name" : "timestamp",
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index d714468e4..c63419d54 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -40,6 +40,9 @@ public class RuntimeMetrics {
public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.failed.added.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.deleted.specs";
public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.unexpected.errors";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.kills.invoked";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.resumes.invoked";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.unexpected.errors";
// Metadata keys
public static final String TOPIC = "topic";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index a543e6641..16ef015a8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -25,6 +25,8 @@ import
org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
//import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
@@ -237,6 +239,7 @@ public class GobblinServiceGuiceModule implements Module {
if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(SpecStoreChangeMonitor.class).toProvider(SpecStoreChangeMonitorFactory.class).in(Singleton.class);
+
binder.bind(DagActionStoreChangeMonitor.class).toProvider(DagActionStoreChangeMonitorFactory.class).in(Singleton.class);
}
binder.bind(GobblinServiceManager.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index c07df5087..8845410a7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -215,6 +216,9 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
@Inject(optional = true)
protected SpecStoreChangeMonitor specStoreChangeMonitor;
+ @Inject(optional = true)
+ protected DagActionStoreChangeMonitor dagActionStoreChangeMonitor;
+
@Inject
protected GobblinServiceManager(GobblinServiceConfiguration configuration)
throws Exception {
this.configuration = Objects.requireNonNull(configuration);
@@ -384,6 +388,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
if (this.configuration.isWarmStandbyEnabled()) {
this.serviceLauncher.addService(specStoreChangeMonitor);
+ this.serviceLauncher.addService(dagActionStoreChangeMonitor);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 321f93b9a..4391a5f9b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -328,30 +328,38 @@ public class DagManager extends AbstractIdleService {
@Subscribe
public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {
+ handleKillFlowRequest(killFlowEvent.getFlowGroup(),
killFlowEvent.getFlowName(), killFlowEvent.getFlowExecutionId());
+ }
+
+ // Method used to handle kill flow requests received from subscriber-event
model or from direct invocation
+ public void handleKillFlowRequest(String flowGroup, String flowName, long
flowExecutionId) {
if (isActive) {
- log.info("Received kill request for flow ({}, {}, {})",
killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(),
- killFlowEvent.getFlowExecutionId());
+ log.info("Received kill request for flow ({}, {}, {})", flowGroup,
flowName, flowExecutionId);
try {
- killFlow(killFlowEvent.getFlowGroup(), killFlowEvent.getFlowName(),
killFlowEvent.getFlowExecutionId());
+ killFlow(flowGroup, flowName, flowExecutionId);
} catch (IOException e) {
log.warn("Failed to kill flow", e);
}
}
}
- @Subscribe
- public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
+ // Method used to handle resume flow requests received from subscriber-event
model or from direct invocation
+ public void handleResumeFlowRequest(String flowGroup, String flowName, long
flowExecutionId) {
if (isActive) {
- log.info("Received resume request for flow ({}, {}, {})",
resumeFlowEvent.getFlowGroup(), resumeFlowEvent.getFlowName(),
resumeFlowEvent.getFlowExecutionId());
- DagId dagId =
DagManagerUtils.generateDagId(resumeFlowEvent.getFlowGroup(),
resumeFlowEvent.getFlowName(),
- resumeFlowEvent.getFlowExecutionId());
- int queueId =
DagManagerUtils.getDagQueueId(resumeFlowEvent.getFlowExecutionId(),
this.numThreads);
+ log.info("Received resume request for flow ({}, {}, {})", flowGroup,
flowName, flowExecutionId);
+ DagId dagId = DagManagerUtils.generateDagId(flowGroup, flowName,
flowExecutionId);
+ int queueId = DagManagerUtils.getDagQueueId(flowExecutionId,
this.numThreads);
if (!this.resumeQueue[queueId].offer(dagId)) {
log.warn("Could not add dag " + dagId + " to resume queue");
}
}
}
+ @Subscribe
+ public void handleResumeFlowEvent(ResumeFlowEvent resumeFlowEvent) {
+ handleResumeFlowRequest(resumeFlowEvent.getFlowGroup(),
resumeFlowEvent.getFlowName(), resumeFlowEvent.getFlowExecutionId());
+ }
+
public synchronized void setTopologySpecMap(Map<URI, TopologySpec>
topologySpecMap) {
this.topologySpecMap = topologySpecMap;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
new file mode 100644
index 000000000..33934ef06
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.cache.LoadingCache;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public final class ChangeMonitorUtils {
+ private ChangeMonitorUtils() {
+ return;
+ }
+
+ /**
+ * Performs checks for duplicate messages and heartbeat operation prior to
processing a message. Returns true if
+ * the pre-conditions above don't apply and we should proceed processing the
change event
+ */
+ public static boolean shouldProcessMessage(String changeIdentifier,
LoadingCache<String, String> cache,
+ String operation, String timestamp) {
+ // If we've already processed a message with this timestamp and key before
then skip duplicate message
+ if (cache.getIfPresent(changeIdentifier) != null) {
+ log.debug("Duplicate change event with identifier {}", changeIdentifier);
+ return false;
+ }
+
+ // If event is a heartbeat type then log it and skip processing
+ if (operation.equals("HEARTBEAT")) {
+ log.debug("Received heartbeat message from time {}", timestamp);
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
new file mode 100644
index 000000000..52be719f1
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+
+
+/**
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent}
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitor extends HighLevelConsumer {
+ public static final String DAG_ACTION_CHANGE_MONITOR_PREFIX =
"dagActionChangeStore";
+
+ // Metrics
+ ContextAwareMeter killsInvoked;
+ ContextAwareMeter resumesInvoked;
+ ContextAwareMeter unexpectedErrors;
+
+ protected CacheLoader<String, String> cacheLoader = new CacheLoader<String,
String>() {
+ @Override
+ public String load(String key) throws Exception {
+ return key;
+ }
+ };
+
+ protected LoadingCache<String, String>
+ dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10,
TimeUnit.MINUTES).build(cacheLoader);
+
+ @Inject
+ protected DagActionStore dagActionStore;
+
+ @Inject
+ protected DagManager dagManager;
+
+ // Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
+ // client itself to determine all Kafka related information dynamically
rather than through the config.
+ public DagActionStoreChangeMonitor(String topic, Config config, int
numThreads) {
+ // Differentiate group id for each host
+ super(topic, config.withValue(GROUP_ID_KEY,
+ ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
+ numThreads);
+ }
+
+ @Override
+ protected void assignTopicPartitions() {
+ // Expects underlying consumer to handle initializing partitions and
offset for the topic -
+ // subscribe to all partitions from latest offset
+ return;
+ }
+
+ @Override
+ /*
+ This class is multi-threaded and this message will be called by multiple
threads, however any given message will be
+ partitioned and processed by only one thread (and corresponding queue).
+ */
+ protected void processMessage(DecodeableKafkaRecord message) {
+ // TODO: Add metric that service is healthy and we're continuously
processing messages.
+ String key = (String) message.getKey();
+ DagActionStoreChangeEvent value = (DagActionStoreChangeEvent)
message.getValue();
+
+ Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+ String operation =
value.getChangeEventIdentifier().getOperationType().name();
+ String flowGroup = value.getFlowGroup();
+ String flowName = value.getFlowName();
+ String flowExecutionId = value.getFlowExecutionId();
+
+ log.debug("Processing Dag Action message for flow group: {} name: {}
executionId: {} timestamp {} operation {}",
+ flowGroup, flowName, flowExecutionId, timestamp, operation);
+
+ String changeIdentifier = timestamp + key;
+ if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
dagActionsSeenCache, operation,
+ timestamp.toString())) {
+ return;
+ }
+
+ // Retrieve the Dag Action taken from MySQL table unless operation is
DELETE
+ DagActionStore.DagActionValue dagAction = null;
+ if (!operation.equals("DELETE")) {
+ try {
+ dagAction = dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId).getDagActionValue();
+ } catch (IOException e) {
+ log.warn("Encountered IOException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, e);
+ this.unexpectedErrors.mark();
+ } catch (SpecNotFoundException e) {
+ log.warn("DagAction not found for flow group: {} name: {} executionId:
{} Exception: {}", flowGroup, flowName,
+ flowExecutionId, e);
+ this.unexpectedErrors.mark();
+ } catch (SQLException throwables) {
+ log.warn("Encountered SQLException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, throwables);
+ throwables.printStackTrace();
+ }
+ }
+
+ // We only expert INSERT and DELETE operations done to this table. INSERTs
correspond to resume or delete flow
+ // requests that have to be processed. DELETEs require no action.
+ try {
+ if (operation.equals("INSERT")) {
+ if (dagAction.equals(DagActionStore.DagActionValue.RESUME)) {
+ dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
+ this.resumesInvoked.mark();
+ } else if (dagAction.equals(DagActionStore.DagActionValue.KILL)) {
+ dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
+ this.killsInvoked.mark();
+ } else {
+ log.warn("Received unsupported dagAction {}. Expected to be a KILL
or RESUME", dagAction);
+ this.unexpectedErrors.mark();
+ return;
+ }
+ } else if (operation.equals("UPDATE")) {
+ log.warn("Received an UPDATE action to the DagActionStore when values
in this store are never supposed to be "
+ + "updated. Flow group: {} name {} executionId {} were updated to
action {}", flowGroup, flowName,
+ flowExecutionId, dagAction);
+ this.unexpectedErrors.mark();
+ } else if (operation.equals("DELETE")) {
+ log.debug("Deleted flow group: {} name: {} executionId {} from
DagActionStore", flowGroup, flowName, flowExecutionId);
+ } else {
+ log.warn("Received unsupported change type of operation {}. Expected
values to be in [INSERT, UPDATE, DELETE]",
+ operation);
+ this.unexpectedErrors.mark();
+ return;
+ }
+ } catch (Exception e) {
+ log.warn("Ran into unexpected error processing DagActionStore changes:
{}", e);
+ this.unexpectedErrors.mark();
+ }
+
+ dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
+ }
+
+ @Override
+ protected void createMetrics() {
+ super.createMetrics();
+ this.killsInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
+ this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
+ this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
+ }
+
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
new file mode 100644
index 000000000..dd8cfad94
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * A factory implementation that returns a {@link DagActionStoreChangeMonitor}
instance.
+ */
+@Slf4j
+public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionStoreChangeMonitor> {
+ static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME =
"org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor";
+ static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
+
+ private final Config config;
+
+ @Inject
+ public DagActionStoreChangeMonitorFactory(Config config) { this.config =
Objects.requireNonNull(config); }
+
+ private DagActionStoreChangeMonitor createDagActionStoreMonitor()
+ throws ReflectiveOperationException {
+ Config dagActionStoreChangeConfig =
config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+ String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
+ int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
+
+ return (DagActionStoreChangeMonitor)
GobblinConstructorUtils.invokeConstructor(
+ Class.forName(DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME), topic,
dagActionStoreChangeConfig, numThreads);
+ }
+
+ @Override
+ public DagActionStoreChangeMonitor get() {
+ try {
+ return createDagActionStoreMonitor();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Failed to initialize DagActionStoreMonitor
due to ", e);
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index b9563141e..f58cb551c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -19,14 +19,8 @@ package org.apache.gobblin.service.monitoring;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
import org.apache.commons.text.StringEscapeUtils;
@@ -34,7 +28,6 @@ import com.codahale.metrics.Meter;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -49,14 +42,10 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
-import
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
/**
- * A Flow Spec Store change monitor that uses {@link SpecStoreChangeEvent}
schema to process Kafka messages received
+ * A Flow Spec Store change monitor that uses {@link GenericStoreChangeEvent}
schema to process Kafka messages received
* from the consumer service. This monitor responds to changes to flow specs
(creations, updates, deletes) and acts as
* a connector between the API and execution layers of GaaS.
*/
@@ -86,6 +75,8 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer
{
@Inject
protected GobblinServiceJobScheduler scheduler;
+ // Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
+ // client itself to determine all Kafka related information dynamically
rather than through the config.
public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
@@ -95,26 +86,9 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
@Override
protected void assignTopicPartitions() {
- // The consumer client will assign itself to all partitions for this topic
and consume from its latest offset.
- List<KafkaTopic> kafkaTopicList =
this.getGobblinKafkaConsumerClient().getFilteredTopics(Collections.EMPTY_LIST,
- Lists.newArrayList(Pattern.compile(this.topic)));
-
- List<KafkaPartition> kafkaPartitions = new ArrayList();
- for (KafkaTopic topic : kafkaTopicList) {
- kafkaPartitions.addAll(topic.getPartitions());
- }
-
- Map<KafkaPartition, LongWatermark> partitionLongWatermarkMap = new
HashMap<>();
- for (KafkaPartition partition : kafkaPartitions) {
- try {
- partitionLongWatermarkMap.put(partition, new
LongWatermark(this.getGobblinKafkaConsumerClient().getLatestOffset(partition)));
- } catch (KafkaOffsetRetrievalFailureException e) {
- log.warn("Failed to retrieve latest Kafka offset, consuming from
beginning for partition {} due to {}",
- partition, e);
- partitionLongWatermarkMap.put(partition, new LongWatermark(0L));
- }
- }
- this.getGobblinKafkaConsumerClient().assignAndSeek(kafkaPartitions,
partitionLongWatermarkMap);
+ // Expects underlying consumer to handle initializing partitions and
offset for the topic -
+ // subscribe to all partitions from latest offset
+ return;
}
@Override
@@ -123,22 +97,18 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
associated with it), a given message itself will be partitioned and assigned
to only one queue.
*/
protected void processMessage(DecodeableKafkaRecord message) {
- String specUri = (String) message.getKey();
- SpecStoreChangeEvent value = (SpecStoreChangeEvent) message.getValue();
+ // TODO: Add metric that service is healthy and we're continuously
processing messages.
+ String key = (String) message.getKey();
+ GenericStoreChangeEvent value = (GenericStoreChangeEvent)
message.getValue();
Long timestamp = value.getTimestamp();
String operation = value.getOperationType().name();
- log.info("Processing message with specUri is {} timestamp is {} operation
is {}", specUri, timestamp, operation);
- // If we've already processed a message with this timestamp and spec uri
before then skip duplicate message
- String changeIdentifier = timestamp.toString() + specUri;
- if (specChangesSeenCache.getIfPresent(changeIdentifier) != null) {
- return;
- }
+ log.debug("Processing message where specUri is {} timestamp is {}
operation is {}", key, timestamp, operation);
- // If event is a heartbeat type then log it and skip processing
- if (operation == "HEARTBEAT") {
- log.debug("Received heartbeat message from time {}", timestamp);
+ String changeIdentifier = timestamp + key;
+ if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
specChangesSeenCache, operation,
+ timestamp.toString())) {
return;
}
@@ -146,23 +116,21 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
URI specAsUri = null;
try {
- specAsUri = new URI(specUri);
+ specAsUri = new URI(key);
} catch (URISyntaxException e) {
- if (operation == "DELETE") {
- log.warn("Could not create URI object for specUri {} due to error {}",
specUri, e.getMessage());
- this.unexpectedErrors.mark();
- return;
- }
+ log.warn("Could not create URI object for specUri {} due to error {}",
key, e.getMessage());
+ this.unexpectedErrors.mark();
+ return;
}
- spec = (operation != "DELETE") ?
this.flowCatalog.getSpecWrapper(specAsUri) : null;
+ spec = (!operation.equals("DELETE")) ?
this.flowCatalog.getSpecWrapper(specAsUri) : null;
// The monitor should continue to process messages regardless of failures
with individual messages, instead we use
// metrics to keep track of failure to process certain SpecStoreChange
events
try {
// Call respective action for the type of change received
AddSpecResponse response;
- if (operation == "INSERT" || operation == "UPDATE") {
+ if (operation.equals("INSERT") || operation.equals("UPDATE")) {
response = scheduler.onAddSpec(spec);
// Null response means the dag failed to compile
@@ -175,7 +143,7 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
+ "invalidate the earlier compilation. Examine changes to locate
error. Response is {}", spec, response);
this.failedAddedSpecs.mark();
}
- } else if (operation == "DELETE") {
+ } else if (operation.equals("DELETE")) {
scheduler.onDeleteSpec(specAsUri, FlowSpec.Builder.DEFAULT_VERSION);
this.deletedSpecs.mark();
} else {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index 3e873f331..901685660 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -35,7 +35,6 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Slf4j
public class SpecStoreChangeMonitorFactory implements
Provider<SpecStoreChangeMonitor> {
static final String SPEC_STORE_CHANGE_MONITOR_CLASS_NAME =
"org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor";
- static final String SPEC_STORE_CHANGE_MONITOR_TOPIC_KEY = "topic";
static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
private final Config config;
@@ -48,7 +47,7 @@ public class SpecStoreChangeMonitorFactory implements
Provider<SpecStoreChangeMo
private SpecStoreChangeMonitor createSpecStoreChangeMonitor()
throws ReflectiveOperationException {
Config specStoreChangeConfig =
config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
- String topic =
specStoreChangeConfig.getString(SPEC_STORE_CHANGE_MONITOR_TOPIC_KEY);
+ String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(specStoreChangeConfig,
SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
return (SpecStoreChangeMonitor) GobblinConstructorUtils.invokeConstructor(