This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 989fcd8f9 [GOBBLIN-1754] Fixes for mysql store change monitors (#3615)
989fcd8f9 is described below
commit 989fcd8f90d5a01ceeb530a19f1eb0ee331a7506
Author: umustafi <[email protected]>
AuthorDate: Wed Dec 14 16:05:50 2022 -0800
[GOBBLIN-1754] Fixes for mysql store change monitors (#3615)
* [GOBBLIN-1754] Fixes for mysql store change monitors
* remove unused imports and print action in error message
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../dag_action_store/MysqlDagActionStore.java | 10 ++++++++--
.../gobblin/runtime/kafka/HighLevelConsumer.java | 2 ++
...lowExecutionResourceHandlerWithWarmStandby.java | 23 +++++++++++++++++++---
.../monitoring/DagActionStoreChangeMonitor.java | 10 ++++++----
.../DagActionStoreChangeMonitorFactory.java | 19 ++++++++++++------
.../service/monitoring/SpecStoreChangeMonitor.java | 10 +++++-----
.../monitoring/SpecStoreChangeMonitorFactory.java | 17 ++++++++++------
7 files changed, 65 insertions(+), 26 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index 7600e304f..caa5ddda7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.runtime.dag_action_store;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -26,7 +24,12 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
import javax.sql.DataSource;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.runtime.api.DagActionStore;
@@ -166,6 +169,9 @@ public class MysqlDagActionStore implements DagActionStore {
result.add(
new DagAction(rs.getString(1), rs.getString(2), rs.getString(3),
DagActionValue.valueOf(rs.getString(4))));
}
+ if (rs != null) {
+ rs.close();
+ }
return result;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag actions from table
%s ", tableName), e);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 1ce525562..7541503ee 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -139,6 +139,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
String kafkaConsumerClientClass =
config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
+ log.info("Creating consumer client of class {}", kafkaConsumerClientClass,
config);
try {
Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
@@ -245,6 +246,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
queues[idx].put(record);
}
} catch (InterruptedException e) {
+ log.warn("Exception encountered while consuming records and adding to
queue {}", e);
Thread.currentThread().interrupt();
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 3f35152be..3919a3a7d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -30,6 +30,7 @@ import java.sql.SQLException;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
@@ -53,8 +54,16 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
+ // If an existing resume or kill request is still pending then do not
accept this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
+ DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
+ new RuntimeException("There is already a pending action " + action
+ " for this flow. Please wait to resubmit and wait for"
+ + " action to be completed."));
+ return;
+ }
this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
- } catch (IOException e) {
+ } catch (IOException | SQLException | SpecNotFoundException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
@@ -82,13 +91,21 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
+ // If an existing resume or kill request is still pending then do not
accept this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
+ DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
+ new RuntimeException("There is already a pending " + action + "
action for this flow. Please wait to resubmit and wait for"
+ + " action to be completed."));
+ return new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);
+ }
this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.KILL);
return new UpdateResponse(HttpStatus.S_200_OK);
- } catch (IOException e) {
+ } catch (IOException | SQLException | SpecNotFoundException e) {
log.warn(
String.format("Failed to add execution delete action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index d40e367ce..8aecd000a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -65,19 +64,20 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
protected LoadingCache<String, String>
dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10,
TimeUnit.MINUTES).build(cacheLoader);
- @Inject
protected DagActionStore dagActionStore;
- @Inject
protected DagManager dagManager;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
- public DagActionStoreChangeMonitor(String topic, Config config, int
numThreads) {
+ public DagActionStoreChangeMonitor(String topic, Config config,
DagActionStore dagActionStore, DagManager dagManager,
+ int numThreads) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
+ this.dagActionStore = dagActionStore;
+ this.dagManager = dagManager;
}
@Override
@@ -136,9 +136,11 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
try {
if (operation.equals("INSERT")) {
if (dagAction.equals(DagActionStore.DagActionValue.RESUME)) {
+ log.info("Received insert dag action and about to send resume flow
request");
dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
this.resumesInvoked.mark();
} else if (dagAction.equals(DagActionStore.DagActionValue.KILL)) {
+ log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
} else {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index dd8cfad94..d4a0656b3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -25,8 +25,9 @@ import javax.inject.Inject;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -34,22 +35,28 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
*/
@Slf4j
public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionStoreChangeMonitor> {
- static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME =
"org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor";
static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY =
"numThreads";
private final Config config;
+ private DagActionStore dagActionStore;
+ private DagManager dagManager;
@Inject
- public DagActionStoreChangeMonitorFactory(Config config) { this.config =
Objects.requireNonNull(config); }
+ public DagActionStoreChangeMonitorFactory(Config config, DagActionStore
dagActionStore, DagManager dagManager) {
+ this.config = Objects.requireNonNull(config);
+ this.dagActionStore = dagActionStore;
+ this.dagManager = dagManager;
+ }
private DagActionStoreChangeMonitor createDagActionStoreMonitor()
throws ReflectiveOperationException {
- Config dagActionStoreChangeConfig =
config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+ Config dagActionStoreChangeConfig =
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+ log.info("DagActionStore will be initialized with config {}",
dagActionStoreChangeConfig);
+
String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return (DagActionStoreChangeMonitor)
GobblinConstructorUtils.invokeConstructor(
- Class.forName(DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME), topic,
dagActionStoreChangeConfig, numThreads);
+ return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagActionStore, this.dagManager, numThreads);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 7b3d0c75d..1e834c76d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -27,7 +27,6 @@ import org.apache.commons.text.StringEscapeUtils;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -70,19 +69,19 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
protected LoadingCache<String, String>
specChangesSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10,
TimeUnit.MINUTES).build(cacheLoader);
- @Inject
protected FlowCatalog flowCatalog;
- @Inject
protected GobblinServiceJobScheduler scheduler;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
- public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+ public SpecStoreChangeMonitor(String topic, Config config, FlowCatalog
flowCatalog, GobblinServiceJobScheduler scheduler, int numThreads) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
+ this.flowCatalog = flowCatalog;
+ this.scheduler = scheduler;
}
@Override
@@ -106,7 +105,7 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
Long timestamp = value.getTimestamp();
String operation = value.getOperationType().name();
- log.debug("Processing message where specUri is {} timestamp is {}
operation is {}", key, timestamp, operation);
+ log.debug("Processing message where specUri is {} timestamp: {} operation:
{}", key, timestamp, operation);
String changeIdentifier = timestamp + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
specChangesSeenCache, operation,
@@ -146,6 +145,7 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
this.failedAddedSpecs.mark();
}
} else if (operation.equals("DELETE")) {
+ log.info("Deleting spec {} after receiving spec store change event",
specAsUri);
scheduler.onDeleteSpec(specAsUri, FlowSpec.Builder.DEFAULT_VERSION);
this.deletedSpecs.mark();
} else {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index 901685660..f63fa6624 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -25,8 +25,9 @@ import javax.inject.Inject;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -34,24 +35,28 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
*/
@Slf4j
public class SpecStoreChangeMonitorFactory implements
Provider<SpecStoreChangeMonitor> {
- static final String SPEC_STORE_CHANGE_MONITOR_CLASS_NAME =
"org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor";
static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
private final Config config;
+ private FlowCatalog flowCatalog;
+ private GobblinServiceJobScheduler scheduler;
@Inject
- public SpecStoreChangeMonitorFactory(Config config) {
+ public SpecStoreChangeMonitorFactory(Config config,FlowCatalog flowCatalog,
GobblinServiceJobScheduler scheduler) {
this.config = Objects.requireNonNull(config);
+ this.flowCatalog = flowCatalog;
+ this.scheduler = scheduler;
}
private SpecStoreChangeMonitor createSpecStoreChangeMonitor()
throws ReflectiveOperationException {
- Config specStoreChangeConfig =
config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
+ Config specStoreChangeConfig =
this.config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
+ log.info("SpecStoreChangeMonitor will be initialized with config {}",
specStoreChangeConfig);
+
String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(specStoreChangeConfig,
SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return (SpecStoreChangeMonitor) GobblinConstructorUtils.invokeConstructor(
- Class.forName(SPEC_STORE_CHANGE_MONITOR_CLASS_NAME), topic,
specStoreChangeConfig, numThreads);
+ return new SpecStoreChangeMonitor(topic, specStoreChangeConfig,
this.flowCatalog, this.scheduler, numThreads);
}
@Override