This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b2769f99 [GOBBLIN-2067] do not try to get dags during 
MostlyMySqlDagManagementStateStore initialization (#3947)
2b2769f99 is described below

commit 2b2769f999582e2ed33c9201e7d4812ced672880
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed May 15 14:44:59 2024 -0700

    [GOBBLIN-2067] do not try to get dags during 
MostlyMySqlDagManagementStateStore initialization (#3947)
    
    * initialize topologies before orchestrator / DMSS and any other objects 
are created because they require topologies
    create setter method for GobblinServiceManager#GOBBLIN_SERVICE_GUICE_MODULE
    * address review comments
---
 .../service/modules/core/GobblinServiceGuiceModule.java       |  1 -
 .../gobblin/service/modules/core/GobblinServiceManager.java   | 11 +++++++----
 .../orchestration/MostlyMySqlDagManagementStateStore.java     |  6 +++++-
 .../service/modules/orchestration/MysqlDagActionStore.java    |  2 +-
 .../org/apache/gobblin/service/GobblinServiceManagerTest.java |  4 ++--
 5 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 5f9893aaf..b4d58e35e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -238,7 +238,6 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bind(SharedFlowMetricsSingleton.class);
     binder.bind(FlowCompilationValidationHelper.class);
 
-    OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
     binder.bind(TopologyCatalog.class);
 
     if (serviceConfig.isTopologySpecFactoryEnabled()) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d0bb037b5..0b8a09843 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -117,7 +117,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   public static final String SERVICE_EVENT_BUS_NAME = 
"GobblinServiceManagerEventBus";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceManager.class);
-  private static volatile GobblinServiceGuiceModule 
GOBBLIN_SERVICE_GUICE_MODULE;
+  @Setter private static volatile GobblinServiceGuiceModule 
GOBBLIN_SERVICE_GUICE_MODULE;
 
   protected final ServiceBasedAppLauncher serviceLauncher;
   private volatile boolean stopInProgress = false;
@@ -265,7 +265,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   }
 
   /**
-   *
+   * If {@link GobblinServiceManager} is created using guice, user should set 
{@link GobblinServiceManager#GOBBLIN_SERVICE_GUICE_MODULE}
+   * for this method to work.
    * @param classToGet
    * @return a new object if the class type is not marked with @Singleton, 
otherwise the same instance of the class
    * @param <T>
@@ -339,7 +340,9 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
         // TODO: surround by try/catch to disconnect from Helix and fail the 
leader transition if DagManager is not
         // transitioned properly
         //Activate DagManager only if TopologyCatalog is initialized. If not; 
skip activation.
-        if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+        // Also skip activation to avoid starting dag manager threads when dag 
proc engine is enabled, because
+        // a) dag manager threads are not required, b) they create unnecessary 
GTEs which may interfere with jobs' execution
+        if (!this.configuration.isDagProcessingEngineEnabled() && 
this.topologyCatalog.getInitComplete().getCount() == 0) {
           this.dagManager.setActive(true);
           this.eventBus.register(this.dagManager);
         }
@@ -530,7 +533,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     this.orchestrator.getSpecCompiler().setActive(true);
 
     //Activate the DagManager service, after the topologyCatalog has been 
initialized.
-    if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
+    if (!this.configuration.isDagProcessingEngineEnabled() && 
(!this.helixManager.isPresent() || this.helixManager.get().isLeader())){
       this.dagManager.setActive(true);
       this.eventBus.register(this.dagManager);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index ae5b5d79f..7f9498764 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -97,7 +97,11 @@ public class MostlyMySqlDagManagementStateStore implements 
DagManagementStateSto
       this.dagStateStore = createDagStateStore(config, topologySpecMap);
       this.failedDagStateStore = 
createDagStateStore(ConfigUtils.getConfigOrEmpty(config, 
FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
           topologySpecMap);
-      initQuota(getDags());
+      // This implementation does not need to update quota usage when the 
service restarts or when its leadership status
+      // changes because quota usage are persisted in mysql table. For the 
same reason, there is no need to call getDags also.
+      // Also, calling getDags during startUp may fail, because the topologies 
that are required to deserialize dags may
+      // not have been added to the topology catalog yet.
+      // initQuota(getDags());
       dagStoresInitialized = true;
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 1141c0e9c..ee1fc5173 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -61,7 +61,7 @@ public class MysqlDagActionStore implements DagActionStore {
   private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT 
EXISTS %s (" +
   "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT 
NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT 
NULL, "
       + "flow_execution_id varchar(" + 
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
-      + "job_name varchar(" + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + 
") NOT NULL, "
+      + "job_name varchar(" + ServiceConfigKeys.MAX_JOB_NAME_LENGTH + ") NOT 
NULL, "
       + "dag_action varchar(50) NOT NULL, modified_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
       + "PRIMARY KEY 
(flow_group,flow_name,flow_execution_id,job_name,dag_action))";
   // Deletes rows older than retention time period (in seconds) to prevent 
this table from growing unbounded.
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 3bae27069..670e40f80 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -277,9 +277,9 @@ public class GobblinServiceManagerTest {
    */
   @Test
   public void testGetClass() {
-    
Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
 instanceof FlowCompilationValidationHelper);
+    
Assert.assertTrue(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
 instanceof FlowCompilationValidationHelper);
     // Optionally bound config
-    Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCatalog.class) 
instanceof FlowCatalog);
+    Assert.assertTrue(GobblinServiceManager.getClass(FlowCatalog.class) 
instanceof FlowCatalog);
   }
 
   /**

Reply via email to