This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2b2769f99 [GOBBLIN-2067] do not try to get dags during
MostlyMySqlDagManagementStateStore initialization (#3947)
2b2769f99 is described below
commit 2b2769f999582e2ed33c9201e7d4812ced672880
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed May 15 14:44:59 2024 -0700
[GOBBLIN-2067] do not try to get dags during
MostlyMySqlDagManagementStateStore initialization (#3947)
* initialize topologies before orchestrator / DMSS and any other objects
are created because they require topologies
create setter method for GobblinServiceManager#GOBBLIN_SERVICE_GUICE_MODULE
* address review comments
---
.../service/modules/core/GobblinServiceGuiceModule.java | 1 -
.../gobblin/service/modules/core/GobblinServiceManager.java | 11 +++++++----
.../orchestration/MostlyMySqlDagManagementStateStore.java | 6 +++++-
.../service/modules/orchestration/MysqlDagActionStore.java | 2 +-
.../org/apache/gobblin/service/GobblinServiceManagerTest.java | 4 ++--
5 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 5f9893aaf..b4d58e35e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -238,7 +238,6 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(SharedFlowMetricsSingleton.class);
binder.bind(FlowCompilationValidationHelper.class);
- OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
binder.bind(TopologyCatalog.class);
if (serviceConfig.isTopologySpecFactoryEnabled()) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d0bb037b5..0b8a09843 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -117,7 +117,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
public static final String SERVICE_EVENT_BUS_NAME =
"GobblinServiceManagerEventBus";
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinServiceManager.class);
- private static volatile GobblinServiceGuiceModule
GOBBLIN_SERVICE_GUICE_MODULE;
+ @Setter private static volatile GobblinServiceGuiceModule
GOBBLIN_SERVICE_GUICE_MODULE;
protected final ServiceBasedAppLauncher serviceLauncher;
private volatile boolean stopInProgress = false;
@@ -265,7 +265,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
}
/**
- *
+ * If {@link GobblinServiceManager} is created using guice, user should set
{@link GobblinServiceManager#GOBBLIN_SERVICE_GUICE_MODULE}
+ * for this method to work.
* @param classToGet
* @return a new object if the class type is not marked with @Singleton,
otherwise the same instance of the class
* @param <T>
@@ -339,7 +340,9 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
// TODO: surround by try/catch to disconnect from Helix and fail the
leader transition if DagManager is not
// transitioned properly
//Activate DagManager only if TopologyCatalog is initialized. If not;
skip activation.
- if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+ // Also skip activation to avoid starting dag manager threads when dag
proc engine is enabled, because
+ // a) dag manager threads are not required, b) they create unnecessary
GTEs which may interfere with jobs' execution
+ if (!this.configuration.isDagProcessingEngineEnabled() &&
this.topologyCatalog.getInitComplete().getCount() == 0) {
this.dagManager.setActive(true);
this.eventBus.register(this.dagManager);
}
@@ -530,7 +533,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.orchestrator.getSpecCompiler().setActive(true);
//Activate the DagManager service, after the topologyCatalog has been
initialized.
- if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
+ if (!this.configuration.isDagProcessingEngineEnabled() &&
(!this.helixManager.isPresent() || this.helixManager.get().isLeader())){
this.dagManager.setActive(true);
this.eventBus.register(this.dagManager);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index ae5b5d79f..7f9498764 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -97,7 +97,11 @@ public class MostlyMySqlDagManagementStateStore implements
DagManagementStateSto
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore =
createDagStateStore(ConfigUtils.getConfigOrEmpty(config,
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
topologySpecMap);
- initQuota(getDags());
+ // This implementation does not need to update quota usage when the
service restarts or when its leadership status
+ // changes because quota usage are persisted in mysql table. For the
same reason, there is no need to call getDags also.
+ // Also, calling getDags during startUp may fail, because the topologies
that are required to deserialize dags may
+ // not have been added to the topology catalog yet.
+ // initQuota(getDags());
dagStoresInitialized = true;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 1141c0e9c..ee1fc5173 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -61,7 +61,7 @@ public class MysqlDagActionStore implements DagActionStore {
private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %s (" +
"flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT
NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT
NULL, "
+ "flow_execution_id varchar(" +
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
- + "job_name varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH +
") NOT NULL, "
+ + "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ") NOT
NULL, "
+ "dag_action varchar(50) NOT NULL, modified_time TIMESTAMP DEFAULT
CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
+ "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,job_name,dag_action))";
// Deletes rows older than retention time period (in seconds) to prevent
this table from growing unbounded.
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 3bae27069..670e40f80 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -277,9 +277,9 @@ public class GobblinServiceManagerTest {
*/
@Test
public void testGetClass() {
-
Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
instanceof FlowCompilationValidationHelper);
+
Assert.assertTrue(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
instanceof FlowCompilationValidationHelper);
// Optionally bound config
- Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCatalog.class)
instanceof FlowCatalog);
+ Assert.assertTrue(GobblinServiceManager.getClass(FlowCatalog.class)
instanceof FlowCatalog);
}
/**