This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 989fcd8f9 [GOBBLIN-1754] Fixes for mysql store change monitors (#3615)
989fcd8f9 is described below

commit 989fcd8f90d5a01ceeb530a19f1eb0ee331a7506
Author: umustafi <[email protected]>
AuthorDate: Wed Dec 14 16:05:50 2022 -0800

    [GOBBLIN-1754] Fixes for mysql store change monitors (#3615)
    
    * [GOBBLIN-1754] Fixes for mysql store change monitors
    
    * remove unused imports and print action in error message
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../dag_action_store/MysqlDagActionStore.java      | 10 ++++++++--
 .../gobblin/runtime/kafka/HighLevelConsumer.java   |  2 ++
 ...lowExecutionResourceHandlerWithWarmStandby.java | 23 +++++++++++++++++++---
 .../monitoring/DagActionStoreChangeMonitor.java    | 10 ++++++----
 .../DagActionStoreChangeMonitorFactory.java        | 19 ++++++++++++------
 .../service/monitoring/SpecStoreChangeMonitor.java | 10 +++++-----
 .../monitoring/SpecStoreChangeMonitorFactory.java  | 17 ++++++++++------
 7 files changed, 65 insertions(+), 26 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index 7600e304f..caa5ddda7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.runtime.dag_action_store;
 
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -26,7 +24,12 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
 import javax.sql.DataSource;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlStateStore;
 import org.apache.gobblin.runtime.api.DagActionStore;
@@ -166,6 +169,9 @@ public class MysqlDagActionStore implements DagActionStore {
         result.add(
             new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
DagActionValue.valueOf(rs.getString(4))));
       }
+      if (rs != null) {
+        rs.close();
+      }
       return result;
     } catch (SQLException e) {
       throw new IOException(String.format("Failure get dag actions from table 
%s ", tableName), e);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 1ce525562..7541503ee 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -139,6 +139,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
 
   protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
     String kafkaConsumerClientClass = 
config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
+    log.info("Creating consumer client of class {}", kafkaConsumerClientClass, 
config);
 
     try {
       Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
@@ -245,6 +246,7 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
         queues[idx].put(record);
       }
     } catch (InterruptedException e) {
+      log.warn("Exception encountered while consuming records and adding to 
queue {}", e);
       Thread.currentThread().interrupt();
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 3f35152be..3919a3a7d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -30,6 +30,7 @@ import java.sql.SQLException;
 import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
@@ -53,8 +54,16 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
+      // If an existing resume or kill request is still pending then do not 
accept this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
+        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
+        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
+            new RuntimeException("There is already a pending action " + action 
+ " for this flow. Please wait to resubmit and wait for"
+                + " action to be completed."));
+        return;
+      }
       this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
-    } catch (IOException e) {
+    } catch (IOException | SQLException | SpecNotFoundException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
@@ -82,13 +91,21 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
+      // If an existing resume or kill request is still pending then do not 
accept this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
+        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
+        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
+            new RuntimeException("There is already a pending " + action + " 
action for this flow. Please wait to resubmit and wait for"
+                + " action to be completed."));
+        return new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);
+      }
       this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.KILL);
       return new UpdateResponse(HttpStatus.S_200_OK);
-    } catch (IOException e) {
+    } catch (IOException | SQLException | SpecNotFoundException e) {
       log.warn(
           String.format("Failed to add execution delete action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
       return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index d40e367ce..8aecd000a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -65,19 +64,20 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   protected LoadingCache<String, String>
       dagActionsSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, 
TimeUnit.MINUTES).build(cacheLoader);
 
-  @Inject
   protected DagActionStore dagActionStore;
 
-  @Inject
   protected DagManager dagManager;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
-  public DagActionStoreChangeMonitor(String topic, Config config, int 
numThreads) {
+  public DagActionStoreChangeMonitor(String topic, Config config, 
DagActionStore dagActionStore, DagManager dagManager,
+      int numThreads) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
+    this.dagActionStore = dagActionStore;
+    this.dagManager = dagManager;
   }
 
   @Override
@@ -136,9 +136,11 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     try {
       if (operation.equals("INSERT")) {
         if (dagAction.equals(DagActionStore.DagActionValue.RESUME)) {
+          log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
           this.resumesInvoked.mark();
         } else if (dagAction.equals(DagActionStore.DagActionValue.KILL)) {
+          log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));
           this.killsInvoked.mark();
         } else {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index dd8cfad94..d4a0656b3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -25,8 +25,9 @@ import javax.inject.Inject;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -34,22 +35,28 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  */
 @Slf4j
 public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionStoreChangeMonitor> {
-  static final String DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME = 
"org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor";
   static final String DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = 
"numThreads";
 
   private final Config config;
+  private DagActionStore dagActionStore;
+  private DagManager dagManager;
 
   @Inject
-  public DagActionStoreChangeMonitorFactory(Config config) { this.config = 
Objects.requireNonNull(config); }
+  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore 
dagActionStore, DagManager dagManager) {
+    this.config = Objects.requireNonNull(config);
+    this.dagActionStore = dagActionStore;
+    this.dagManager = dagManager;
+  }
 
   private DagActionStoreChangeMonitor createDagActionStoreMonitor()
     throws ReflectiveOperationException {
-    Config dagActionStoreChangeConfig = 
config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+    Config dagActionStoreChangeConfig = 
this.config.getConfig(DagActionStoreChangeMonitor.DAG_ACTION_CHANGE_MONITOR_PREFIX);
+    log.info("DagActionStore will be initialized with config {}", 
dagActionStoreChangeConfig);
+
     String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return (DagActionStoreChangeMonitor) 
GobblinConstructorUtils.invokeConstructor(
-        Class.forName(DAG_ACTION_STORE_CHANGE_MONITOR_CLASS_NAME), topic, 
dagActionStoreChangeConfig, numThreads);
+    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagActionStore, this.dagManager, numThreads);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 7b3d0c75d..1e834c76d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -27,7 +27,6 @@ import org.apache.commons.text.StringEscapeUtils;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -70,19 +69,19 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
   protected LoadingCache<String, String>
       specChangesSeenCache = CacheBuilder.newBuilder().expireAfterWrite(10, 
TimeUnit.MINUTES).build(cacheLoader);
 
-  @Inject
   protected FlowCatalog flowCatalog;
 
-  @Inject
   protected GobblinServiceJobScheduler scheduler;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
-  public SpecStoreChangeMonitor(String topic, Config config, int numThreads) {
+  public SpecStoreChangeMonitor(String topic, Config config, FlowCatalog 
flowCatalog, GobblinServiceJobScheduler scheduler, int numThreads) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(SPEC_STORE_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
+    this.flowCatalog = flowCatalog;
+    this.scheduler = scheduler;
   }
 
   @Override
@@ -106,7 +105,7 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     Long timestamp = value.getTimestamp();
     String operation = value.getOperationType().name();
 
-    log.debug("Processing message where specUri is {} timestamp is {} 
operation is {}", key, timestamp, operation);
+    log.debug("Processing message where specUri is {} timestamp: {} operation: 
{}", key, timestamp, operation);
 
     String changeIdentifier = timestamp + key;
     if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
specChangesSeenCache, operation,
@@ -146,6 +145,7 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
           this.failedAddedSpecs.mark();
         }
       } else if (operation.equals("DELETE")) {
+        log.info("Deleting spec {} after receiving spec store change event", 
specAsUri);
         scheduler.onDeleteSpec(specAsUri, FlowSpec.Builder.DEFAULT_VERSION);
         this.deletedSpecs.mark();
       } else {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index 901685660..f63fa6624 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -25,8 +25,9 @@ import javax.inject.Inject;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -34,24 +35,28 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  */
 @Slf4j
 public class SpecStoreChangeMonitorFactory implements 
Provider<SpecStoreChangeMonitor> {
-  static final String SPEC_STORE_CHANGE_MONITOR_CLASS_NAME = 
"org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor";
   static final String SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY = "numThreads";
 
   private final Config config;
+  private FlowCatalog flowCatalog;
+  private GobblinServiceJobScheduler scheduler;
 
   @Inject
-  public SpecStoreChangeMonitorFactory(Config config) {
+  public SpecStoreChangeMonitorFactory(Config config,FlowCatalog flowCatalog, 
GobblinServiceJobScheduler scheduler) {
     this.config = Objects.requireNonNull(config);
+    this.flowCatalog = flowCatalog;
+    this.scheduler = scheduler;
   }
 
   private SpecStoreChangeMonitor createSpecStoreChangeMonitor()
       throws ReflectiveOperationException {
-    Config specStoreChangeConfig = 
config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
+    Config specStoreChangeConfig = 
this.config.getConfig(SpecStoreChangeMonitor.SPEC_STORE_CHANGE_MONITOR_PREFIX);
+    log.info("SpecStoreChangeMonitor will be initialized with config {}", 
specStoreChangeConfig);
+
     String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(specStoreChangeConfig, 
SPEC_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return (SpecStoreChangeMonitor) GobblinConstructorUtils.invokeConstructor(
-        Class.forName(SPEC_STORE_CHANGE_MONITOR_CLASS_NAME), topic, 
specStoreChangeConfig, numThreads);
+    return new SpecStoreChangeMonitor(topic, specStoreChangeConfig, 
this.flowCatalog, this.scheduler, numThreads);
   }
 
   @Override

Reply via email to