This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a21428251c [GOBBLIN-2131] process all dag actions during startup 
parallely (#4024)
a21428251c is described below

commit a21428251cdbe265cfe2b5c6398c1cae23a69b7f
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Aug 22 02:32:11 2024 +0530

    [GOBBLIN-2131] process all dag actions during startup parallely (#4024)
    
    * process all dag actions during startup parallely
    * addressed PR comments
---
 .../gobblin/configuration/ConfigurationKeys.java   |  3 +++
 .../monitoring/DagActionStoreChangeMonitor.java    | 24 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b6f37c315f..bc9d6c8047 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -99,6 +99,9 @@ public class ConfigurationKeys {
   public static final String MYSQL_DAG_ACTION_STORE_PREFIX = 
"MysqlDagActionStore.";
   public static final String 
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = 
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
   public static final long 
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 
60; // (3 days in seconds)
+  //  Dag Action Store Change Monitor configuration
+  public static final String 
DAG_ACTION_STORE_MONITOR_EXECUTOR_THREADS="dagActionStoreChangeMonitor.executor.numThreads";
+  public static final String 
DAG_ACTION_STORE_MONITOR_EXECUTOR_TIMEOUT_SECONDS="dagActionStoreChangeMonitor.executor.timeout.seconds";
   // Scheduler lease determination store configuration
   public static final String SCHEDULER_LEASE_ARBITER_NAME = 
"SchedulerFlowLaunchLeaseArbiter";
   public static final String PROCESSING_LEASE_ARBITER_NAME = 
"DagActionProcessingLeaseArbiter";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index bb95d2481f..0a618ecbd8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -22,6 +22,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -29,9 +31,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
+
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
@@ -52,6 +56,8 @@ import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import 
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
 
 
 /**
@@ -76,7 +82,6 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
   protected ContextAwareMeter heartbeatMessagesMeter;
   protected ContextAwareMeter nullDagActionTypeMessagesMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
-
   protected volatile Long produceToConsumeDelayValue = -1L;
 
   protected LaunchSubmissionMetricProxy ON_STARTUP = new 
NullLaunchSubmissionMetricProxy();
@@ -146,15 +151,28 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer<String, DagAc
       throw new RuntimeException(String.format("Unable to retrieve dagActions 
from the dagActionStore while "
           + "initializing the %s", 
DagActionStoreChangeMonitor.class.getCanonicalName()), e);
     }
-    // TODO: make this multi-threaded to add parallelism
+
+    final ExecutorService executorService = 
Executors.newFixedThreadPool(ConfigUtils.getInt(this.config, 
ConfigurationKeys.DAG_ACTION_STORE_MONITOR_EXECUTOR_THREADS, 5), 
ExecutorsUtils.newThreadFactory(Optional.of(log)));
+
     for (DagActionStore.DagAction action : dagActions) {
       try {
-        handleDagAction(action, true);
+        executorService.submit(() -> handleDagAction(action, true));
       } catch (Exception e) {
         log.error("Unexpected error initializing from DagActionStore changes, 
upon {}", action, e);
         this.unexpectedErrors.mark();
       }
     }
+    try {
+      boolean executedSuccessfully = 
executorService.awaitTermination(ConfigUtils.getInt(this.config, 
ConfigurationKeys.DAG_ACTION_STORE_MONITOR_EXECUTOR_TIMEOUT_SECONDS, 30), 
TimeUnit.SECONDS);
+
+      if (!executedSuccessfully) {
+        log.error("Executor terminated before processing all actions during 
startup,consider increasing the timeOut during awaitTermination");
+        this.unexpectedErrors.mark();
+      }
+    } catch (InterruptedException ignored) {
+      log.error("Interrupted Exception in processing dag actions during 
startup,ignoring", ignored);
+      this.unexpectedErrors.mark();
+    }
   }
 
   /*

Reply via email to