This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c3d1ba892 [GOBBLIN-1737] Fix bug when using mysql user quota manager
(#3595)
c3d1ba892 is described below
commit c3d1ba89249a739547c464c1bb4b43126849ee71
Author: Zihan Li <[email protected]>
AuthorDate: Tue Nov 8 11:28:14 2022 -0800
[GOBBLIN-1737] Fix bug when using mysql user quota manager (#3595)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1737] Fix bug when using mysql user quota manager
* fix tests
Co-authored-by: Zihan Li <[email protected]>
---
.../service/modules/flow/BaseFlowToJobSpecCompiler.java | 15 ++++++++++-----
.../gobblin/service/modules/orchestration/DagManager.java | 3 ++-
.../modules/orchestration/MysqlUserQuotaManager.java | 14 +++++++++++---
.../modules/orchestration/MysqlUserQuotaManagerTest.java | 8 ++++----
4 files changed, 27 insertions(+), 13 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index d0e997771..86fee4a68 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -24,10 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,8 +95,7 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
private boolean warmStandbyEnabled;
- @Inject
- UserQuotaManager userQuotaManager;
+ private Optional<UserQuotaManager> userQuotaManager;
public BaseFlowToJobSpecCompiler(Config config){
this(config,true);
@@ -128,6 +127,12 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
}
this.warmStandbyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+ if (this.warmStandbyEnabled) {
+ userQuotaManager =
Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+ ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config));
+ } else {
+ userQuotaManager = Optional.absent();
+ }
this.topologySpecMap = Maps.newConcurrentMap();
this.config = config;
@@ -197,10 +202,10 @@ public abstract class BaseFlowToJobSpecCompiler
implements SpecCompiler {
response = dag.toString();
}
- if (FlowCatalog.isCompileSuccessful(response) && this.warmStandbyEnabled
&& !flowSpec.isExplain() &&
+ if (FlowCatalog.isCompileSuccessful(response) &&
this.userQuotaManager.isPresent() && !flowSpec.isExplain() &&
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
try {
- userQuotaManager.checkQuota(dag.getStartNodes());
+ userQuotaManager.get().checkQuota(dag.getStartNodes());
flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW,
"true");
} catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 8532ec9b8..7a20ea945 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -390,7 +390,8 @@ public class DagManager extends AbstractIdleService {
this.dagManagerMetrics.activate();
- UserQuotaManager quotaManager = new InMemoryUserQuotaManager(config);
+ UserQuotaManager quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+ ConfigUtils.getString(config,
ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
quotaManager.init(dagStateStore.getDags());
//On startup, the service creates DagManagerThreads that are scheduled
at a fixed rate.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 1cf49c83f..93eb29e24 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -50,6 +50,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
+ public final static String CONFIG_PREFIX= "MysqlUserQuotaManager";
public final MysqlQuotaStore quotaStore;
public final RunningDagIdsStore runningDagIds;
@@ -57,8 +58,14 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.quotaStore = createQuotaStore(config);
- this.runningDagIds = createRunningDagStore(config);
+ Config quotaStoreConfig;
+ if (config.hasPath(CONFIG_PREFIX)) {
+ quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlUserQuotaManager");
+ }
+ this.quotaStore = createQuotaStore(quotaStoreConfig);
+ this.runningDagIds = createRunningDagStore(quotaStoreConfig);
}
void addDagId(Connection connection, String dagId) throws IOException {
@@ -295,7 +302,8 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
DECREASE_FLOWGROUP_COUNT_SQL = "UPDATE " + tableName + " SET
flowgroup_count=flowgroup_count-1 WHERE name = ?";
DELETE_USER_SQL = "DELETE FROM " + tableName + " WHERE name = ? AND
user_count<1 AND flowgroup_count<1";
- String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + "
(name VARCHAR(20) CHARACTER SET latin1 NOT NULL, "
+ //Increase the length of name as we include the executor uri in it
+ String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + "
(name VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+ "user_count INT NOT NULL DEFAULT 0, requester_count INT NOT NULL
DEFAULT 0, flowgroup_count INT NOT NULL DEFAULT 0, "
+ "PRIMARY KEY (name), " + "UNIQUE INDEX ind (name))";
try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement =
connection.prepareStatement(createQuotaTable)) {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index a8931738d..cf39adbfc 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -45,10 +45,10 @@ public class MysqlUserQuotaManagerTest {
ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
- .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+ .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(MysqlUserQuotaManager.CONFIG_PREFIX + '.' +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
.build();
this.quotaManager = new MysqlUserQuotaManager(config);