This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c3d1ba892 [GOBBLIN-1737] Fix bug when using mysql user quota manager 
(#3595)
c3d1ba892 is described below

commit c3d1ba89249a739547c464c1bb4b43126849ee71
Author: Zihan Li <[email protected]>
AuthorDate: Tue Nov 8 11:28:14 2022 -0800

    [GOBBLIN-1737] Fix bug when using mysql user quota manager (#3595)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1737] Fix bug when using mysql user quota manager
    
    * fix tests
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../service/modules/flow/BaseFlowToJobSpecCompiler.java   | 15 ++++++++++-----
 .../gobblin/service/modules/orchestration/DagManager.java |  3 ++-
 .../modules/orchestration/MysqlUserQuotaManager.java      | 14 +++++++++++---
 .../modules/orchestration/MysqlUserQuotaManagerTest.java  |  8 ++++----
 4 files changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index d0e997771..86fee4a68 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import javax.inject.Inject;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.quartz.CronExpression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,8 +95,7 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
 
   private boolean warmStandbyEnabled;
 
-  @Inject
-  UserQuotaManager userQuotaManager;
+  private Optional<UserQuotaManager> userQuotaManager;
 
   public BaseFlowToJobSpecCompiler(Config config){
     this(config,true);
@@ -128,6 +127,12 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     }
 
     this.warmStandbyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+    if (this.warmStandbyEnabled) {
+      userQuotaManager = 
Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+          ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config));
+    } else {
+      userQuotaManager = Optional.absent();
+    }
 
     this.topologySpecMap = Maps.newConcurrentMap();
     this.config = config;
@@ -197,10 +202,10 @@ public abstract class BaseFlowToJobSpecCompiler 
implements SpecCompiler {
       response = dag.toString();
     }
 
-    if (FlowCatalog.isCompileSuccessful(response) && this.warmStandbyEnabled 
&& !flowSpec.isExplain() &&
+    if (FlowCatalog.isCompileSuccessful(response) && 
this.userQuotaManager.isPresent() && !flowSpec.isExplain() &&
         
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
 || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
       try {
-        userQuotaManager.checkQuota(dag.getStartNodes());
+        userQuotaManager.get().checkQuota(dag.getStartNodes());
         
flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
 "true");
       } catch (IOException e) {
         throw new RuntimeException(e);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 8532ec9b8..7a20ea945 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -390,7 +390,8 @@ public class DagManager extends AbstractIdleService {
 
        this.dagManagerMetrics.activate();
 
-        UserQuotaManager quotaManager = new InMemoryUserQuotaManager(config);
+        UserQuotaManager quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+            ConfigUtils.getString(config, 
ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
         quotaManager.init(dagStateStore.getDags());
 
         //On startup, the service creates DagManagerThreads that are scheduled 
at a fixed rate.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 1cf49c83f..93eb29e24 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 @Singleton
 public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
+  public final static String CONFIG_PREFIX= "MysqlUserQuotaManager";
   public final MysqlQuotaStore quotaStore;
   public final RunningDagIdsStore runningDagIds;
 
@@ -57,8 +58,14 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
   @Inject
   public MysqlUserQuotaManager(Config config) throws IOException {
     super(config);
-    this.quotaStore = createQuotaStore(config);
-    this.runningDagIds = createRunningDagStore(config);
+    Config quotaStoreConfig;
+    if (config.hasPath(CONFIG_PREFIX)) {
+      quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
+    } else {
+      throw new IOException("Please specify the config for 
MysqlUserQuotaManager");
+    }
+    this.quotaStore = createQuotaStore(quotaStoreConfig);
+    this.runningDagIds = createRunningDagStore(quotaStoreConfig);
   }
 
   void addDagId(Connection connection, String dagId) throws IOException {
@@ -295,7 +302,8 @@ public class MysqlUserQuotaManager extends 
AbstractUserQuotaManager {
       DECREASE_FLOWGROUP_COUNT_SQL = "UPDATE " + tableName + " SET 
flowgroup_count=flowgroup_count-1 WHERE name = ?";
       DELETE_USER_SQL = "DELETE FROM " + tableName + " WHERE name = ? AND 
user_count<1 AND flowgroup_count<1";
 
-      String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(name VARCHAR(20) CHARACTER SET latin1 NOT NULL, "
+      //Increase the length of name as we include the executor uri in it
+      String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " 
(name VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
           + "user_count INT NOT NULL DEFAULT 0, requester_count INT NOT NULL 
DEFAULT 0, flowgroup_count INT NOT NULL DEFAULT 0, "
           + "PRIMARY KEY (name), " + "UNIQUE INDEX ind (name))";
       try (Connection connection = dataSource.getConnection(); 
PreparedStatement createStatement = 
connection.prepareStatement(createQuotaTable)) {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index a8931738d..cf39adbfc 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -45,10 +45,10 @@ public class MysqlUserQuotaManagerTest {
     ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
 
     Config config = ConfigBuilder.create()
-        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testDb.getJdbcUrl())
-        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
-        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
-        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
         .build();
 
     this.quotaManager = new MysqlUserQuotaManager(config);

Reply via email to