This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8cb523dc7 [GOBBLIN-1773] Fix bugs in quota manager (#3636)
8cb523dc7 is described below
commit 8cb523dc79d61056db00a2ace4eabea58931a5ff
Author: Zihan Li <[email protected]>
AuthorDate: Thu Feb 9 09:46:15 2023 -0800
[GOBBLIN-1773] Fix bugs in quota manager (#3636)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1773] Fix bug in quota manager of gobblinservice where we
increase quota twice for run-immediately flow
* change typo
* fix unit test
* fix the dead lock issue and use shared data source
* address comments
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../org/apache/gobblin/metastore/MysqlStateStore.java | 2 +-
.../runtime/dag_action_store/MysqlDagActionStore.java | 6 ++++--
.../gobblin/runtime/spec_catalog/FlowCatalogTest.java | 8 +++++---
.../modules/flow/BaseFlowToJobSpecCompiler.java | 3 ++-
.../modules/orchestration/MysqlUserQuotaManager.java | 18 +++++++++++-------
.../service/modules/orchestration/Orchestrator.java | 13 +++++++++++++
.../modules/scheduler/GobblinServiceJobScheduler.java | 7 ++++---
.../scheduler/GobblinServiceJobSchedulerTest.java | 4 ++--
8 files changed, 42 insertions(+), 19 deletions(-)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index c2bc1e68a..85d5cbc5b 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -201,7 +201,7 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
* @param config the properties used for datasource instantiation
* @return
*/
- public static DataSource newDataSource(Config config) {
+ static DataSource newDataSource(Config config) {
HikariDataSource dataSource = new HikariDataSource();
PasswordManager passwordManager =
PasswordManager.getInstance(ConfigUtils.configToProperties(config));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index caa5ddda7..ce2d1b923 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -30,8 +30,9 @@ import com.typesafe.config.Config;
import javax.sql.DataSource;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
@@ -67,7 +68,8 @@ public class MysqlDagActionStore implements DagActionStore {
this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
- this.dataSource = MysqlStateStore.newDataSource(config);
+ this.dataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());;
try (Connection connection = dataSource.getConnection();
PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
createStatement.executeUpdate();
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index f79d8501a..3e0bb1bb1 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -111,10 +111,10 @@ public class FlowCatalogTest {
* Create FLowSpec with specified URI and SpecStore location.
*/
public static FlowSpec initFlowSpec(String specStore, URI uri, String
flowName){
- return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
+ return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty(),
false);
}
- public static FlowSpec initFlowSpec(String specStore, URI uri, String
flowName, String flowGroup, Config additionalConfigs) {
+ public static FlowSpec initFlowSpec(String specStore, URI uri, String
flowName, String flowGroup, Config additionalConfigs, boolean isAdhoc) {
Properties properties = new Properties();
properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
@@ -122,7 +122,9 @@ public class FlowCatalogTest {
properties.put("job.group", flowGroup);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
- properties.put("job.schedule", "0 0 0 ? * * 2050");
+ if (!isAdhoc) {
+ properties.put("job.schedule", "0 0 0 ? * * 2050");
+ }
Config defaults = ConfigUtils.propertiesToConfig(properties);
Config config = additionalConfigs.withFallback(defaults);
SpecExecutor specExecutorInstanceProducer = new
InMemorySpecExecutor(config);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 5b06f1eb4..528b4d387 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -203,8 +203,9 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
}
if (FlowCatalog.isCompileSuccessful(response) &&
this.userQuotaManager.isPresent() && !flowSpec.isExplain() &&
-
(!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)
|| PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(),
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+
!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY))
{
try {
+ // We only check quota for adhoc flow, since we don't have the
execution id for run-immediately flow
userQuotaManager.get().checkQuota(dag.getStartNodes());
} catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 8f5c57e49..cdbce8e5e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -37,9 +37,10 @@ import javax.inject.Singleton;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -63,6 +64,7 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
+ log.info("Going to initialize mysqlUserQuotaManager");
Config quotaStoreConfig;
if (config.hasPath(CONFIG_PREFIX)) {
quotaStoreConfig = config.getConfig(CONFIG_PREFIX).withFallback(config);
@@ -225,10 +227,6 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
}
- String flowGroup =
- ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
- decrementJobCount(connection,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
CountType.FLOWGROUP_COUNT);
-
String serializedRequesters =
DagManagerUtils.getSerializedRequesterList(dagNode);
try {
for (String requester :
DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
@@ -239,6 +237,10 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
log.error("Failed to release quota for requester list " +
serializedRequesters, e);
return false;
}
+
+ String flowGroup =
+ ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
ConfigurationKeys.FLOW_GROUP_KEY, "");
+ decrementJobCount(connection,
DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode),
CountType.FLOWGROUP_COUNT);
connection.commit();
} catch (SQLException ex) {
throw new IOException(ex);
@@ -265,7 +267,8 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
String quotaStoreTableName = ConfigUtils.getString(config,
ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);
- DataSource dataSource = MysqlStateStore.newDataSource(config);
+ DataSource dataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());
return new MysqlQuotaStore(dataSource, quotaStoreTableName);
}
@@ -274,7 +277,8 @@ public class MysqlUserQuotaManager extends
AbstractUserQuotaManager {
String quotaStoreTableName = ConfigUtils.getString(config,
ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);
- DataSource dataSource = MysqlStateStore.newDataSource(config);
+ DataSource dataSource = MysqlDataSourceFactory.get(config,
+ SharedResourcesBrokerFactory.getImplicitBroker());;
return new RunningDagIdsStore(dataSource, quotaStoreTableName);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 9ba8abd38..7dbc55011 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,6 +102,9 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
@Setter
private FlowStatusGenerator flowStatusGenerator;
+ private UserQuotaManager quotaManager;
+
+
private final ClassAliasResolver<SpecCompiler> aliasResolver;
private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
@@ -150,6 +154,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
this.flowConcurrencyFlag = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
+ quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
+ ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER), config);
}
@Inject
@@ -247,6 +253,13 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
+ "concurrent executions are disabled for this flow.", flowGroup,
flowName);
conditionallyUpdateFlowGaugeSpecState(spec, CompiledState.SKIPPED);
Instrumented.markMeter(this.skippedFlowsMeter);
+ if
(!((FlowSpec)spec).getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY))
{
+ // For ad-hoc flow, we might already increase quota, we need to
decrease here
+ Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(spec);
+ for(Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
+ quotaManager.releaseQuota(dagNode);
+ }
+ }
// Send FLOW_FAILED event
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 3f0791062..9c365e54f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -328,10 +328,11 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
return new AddSpecResponse<>(response);
}
- // Check quota limits against run immediately flows or adhoc flows before
saving the schedule
+ // Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer
when we accept the flow
- if (!this.warmStandbyEnabled &&
(!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) ||
PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
- // This block should be reachable only for the first execution for the
adhoc flows (flows that either do not have a schedule or have
runImmediately=true.
+ if (!this.warmStandbyEnabled &&
!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ // This block should be reachable only for the execution for the adhoc
flows
+ // For flow that has scheduler but run-immediately set to be true, we
won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check
won't double add quotas for a flow in the DagManager
try {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 107892243..1c67f33e0 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -317,9 +317,9 @@ public class GobblinServiceJobSchedulerTest {
serviceLauncher.start();
FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
"flowName0", "group1",
-
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
ConfigValueFactory.fromAnyRef("true")));
+ ConfigFactory.empty(), true);
FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"),
"flowName1", "group1",
-
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
ConfigValueFactory.fromAnyRef("true")));
+ ConfigFactory.empty(), true);
Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);