This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cb523dc7 [GOBBLIN-1773] Fix bugs in quota manager (#3636)
8cb523dc7 is described below

commit 8cb523dc79d61056db00a2ace4eabea58931a5ff
Author: Zihan Li <[email protected]>
AuthorDate: Thu Feb 9 09:46:15 2023 -0800

    [GOBBLIN-1773] Fix bugs in quota manager (#3636)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1773] Fix bug in quota manager of gobblinservice where we 
increase quota twice for run-immediately flow
    
    * change typo
    
    * fix unit test
    
    * fix the dead lock issue and use shared data source
    
    * address comments
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../org/apache/gobblin/metastore/MysqlStateStore.java  |  2 +-
 .../runtime/dag_action_store/MysqlDagActionStore.java  |  6 ++++--
 .../gobblin/runtime/spec_catalog/FlowCatalogTest.java  |  8 +++++---
 .../modules/flow/BaseFlowToJobSpecCompiler.java        |  3 ++-
 .../modules/orchestration/MysqlUserQuotaManager.java   | 18 +++++++++++-------
 .../service/modules/orchestration/Orchestrator.java    | 13 +++++++++++++
 .../modules/scheduler/GobblinServiceJobScheduler.java  |  7 ++++---
 .../scheduler/GobblinServiceJobSchedulerTest.java      |  4 ++--
 8 files changed, 42 insertions(+), 19 deletions(-)

diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index c2bc1e68a..85d5cbc5b 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -201,7 +201,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
    * @param config the properties used for datasource instantiation
    * @return
    */
-  public static DataSource newDataSource(Config config) {
+  static DataSource newDataSource(Config config) {
     HikariDataSource dataSource = new HikariDataSource();
     PasswordManager passwordManager = 
PasswordManager.getInstance(ConfigUtils.configToProperties(config));
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index caa5ddda7..ce2d1b923 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -30,8 +30,9 @@ import com.typesafe.config.Config;
 
 import javax.sql.DataSource;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
@@ -67,7 +68,8 @@ public class MysqlDagActionStore implements DagActionStore {
     this.tableName = ConfigUtils.getString(config, 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
         ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
 
-    this.dataSource = MysqlStateStore.newDataSource(config);
+    this.dataSource = MysqlDataSourceFactory.get(config,
+        SharedResourcesBrokerFactory.getImplicitBroker());;
     try (Connection connection = dataSource.getConnection();
         PreparedStatement createStatement = 
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
       createStatement.executeUpdate();
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index f79d8501a..3e0bb1bb1 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -111,10 +111,10 @@ public class FlowCatalogTest {
      * Create FLowSpec with specified URI and SpecStore location.
      */
   public static FlowSpec initFlowSpec(String specStore, URI uri, String 
flowName){
-    return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
+    return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty(), 
false);
   }
 
-  public static FlowSpec initFlowSpec(String specStore, URI uri, String 
flowName, String flowGroup, Config additionalConfigs) {
+  public static FlowSpec initFlowSpec(String specStore, URI uri, String 
flowName, String flowGroup, Config additionalConfigs, boolean isAdhoc) {
     Properties properties = new Properties();
     properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
     properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
@@ -122,7 +122,9 @@ public class FlowCatalogTest {
     properties.put("job.group", flowGroup);
     properties.put("specStore.fs.dir", specStore);
     properties.put("specExecInstance.capabilities", "source:destination");
-    properties.put("job.schedule", "0 0 0 ? * * 2050");
+    if (!isAdhoc) {
+      properties.put("job.schedule", "0 0 0 ? * * 2050");
+    }
     Config defaults = ConfigUtils.propertiesToConfig(properties);
     Config config = additionalConfigs.withFallback(defaults);
     SpecExecutor specExecutorInstanceProducer = new 
InMemorySpecExecutor(config);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 5b06f1eb4..528b4d387 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -203,8 +203,9 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     }
 
     if (FlowCatalog.isCompileSuccessful(response) && 
this.userQuotaManager.isPresent() && !flowSpec.isExplain() &&
-        
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
 || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+        
!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY))
 {
       try {
+        // We only check quota for adhoc flow, since we don't have the 
execution id for run-immediately flow
         userQuotaManager.get().checkQuota(dag.getStartNodes());
       } catch (IOException e) {
         throw new RuntimeException(e);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 8f5c57e49..cdbce8e5e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -37,9 +37,10 @@ import javax.inject.Singleton;
 import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -63,6 +64,7 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
   @Inject
   public MysqlUserQuotaManager(Config config) throws IOException {
     super(config);
+    log.info("Going to initialize mysqlUserQuotaManager");
     Config quotaStoreConfig;
     if (config.hasPath(CONFIG_PREFIX)) {
       quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
@@ -225,10 +227,6 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
         decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
       }
 
-      String flowGroup =
-          ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
ConfigurationKeys.FLOW_GROUP_KEY, "");
-      decrementJobCount(connection, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
CountType.FLOWGROUP_COUNT);
-
       String serializedRequesters = 
DagManagerUtils.getSerializedRequesterList(dagNode);
       try {
         for (String requester : 
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
@@ -239,6 +237,10 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
         log.error("Failed to release quota for requester list " + 
serializedRequesters, e);
         return false;
       }
+
+      String flowGroup =
+          ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), 
ConfigurationKeys.FLOW_GROUP_KEY, "");
+      decrementJobCount(connection, 
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), 
CountType.FLOWGROUP_COUNT);
       connection.commit();
     } catch (SQLException ex) {
       throw new IOException(ex);
@@ -265,7 +267,8 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
     String quotaStoreTableName = ConfigUtils.getString(config, 
ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
         ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);
 
-    DataSource dataSource = MysqlStateStore.newDataSource(config);
+    DataSource dataSource = MysqlDataSourceFactory.get(config,
+        SharedResourcesBrokerFactory.getImplicitBroker());
 
     return new MysqlQuotaStore(dataSource, quotaStoreTableName);
   }
@@ -274,7 +277,8 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
     String quotaStoreTableName = ConfigUtils.getString(config, 
ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
         ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);
 
-    DataSource dataSource = MysqlStateStore.newDataSource(config);
+    DataSource dataSource = MysqlDataSourceFactory.get(config,
+        SharedResourcesBrokerFactory.getImplicitBroker());;
 
     return new RunningDagIdsStore(dataSource, quotaStoreTableName);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 9ba8abd38..7dbc55011 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,6 +102,9 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   @Setter
   private FlowStatusGenerator flowStatusGenerator;
 
+  private UserQuotaManager quotaManager;
+
+
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
   private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
@@ -150,6 +154,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
     this.flowConcurrencyFlag = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
         ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+    quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+        ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
   }
 
   @Inject
@@ -247,6 +253,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
             + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
         conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
         Instrumented.markMeter(this.skippedFlowsMeter);
+        if 
(!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY))
 {
+          // For ad-hoc flow, we might already increase quota, we need to 
decrease here
+          Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
+          for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+            quotaManager.releaseQuota(dagNode);
+          }
+        }
 
         // Send FLOW_FAILED event
         Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 3f0791062..9c365e54f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -328,10 +328,11 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       return new AddSpecResponse<>(response);
     }
 
-    // Check quota limits against run immediately flows or adhoc flows before 
saving the schedule
+    // Check quota limits against adhoc flows before saving the schedule
     // In warm standby mode, this quota check will happen on restli API layer 
when we accept the flow
-    if (!this.warmStandbyEnabled && 
(!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || 
PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
-      // This block should be reachable only for the first execution for the 
adhoc flows (flows that either do not have a schedule or have 
runImmediately=true.
+    if (!this.warmStandbyEnabled && 
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      // This block should be reachable only for the execution for the adhoc 
flows
+      // For flow that has scheduler but run-immediately set to be true, we 
won't check the quota as we will use a different execution id later
       if (quotaManager.isPresent()) {
         // QuotaManager has idempotent checks for a dagNode, so this check 
won't double add quotas for a flow in the DagManager
         try {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 107892243..1c67f33e0 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -317,9 +317,9 @@ public class GobblinServiceJobSchedulerTest {
     serviceLauncher.start();
 
     FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), 
"flowName0", "group1",
-        
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
ConfigValueFactory.fromAnyRef("true")));
+        ConfigFactory.empty(), true);
     FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), 
"flowName1", "group1",
-        
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
ConfigValueFactory.fromAnyRef("true")));
+        ConfigFactory.empty(), true);
 
     Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
     SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);

Reply via email to