This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a21428251c [GOBBLIN-2131] process all dag actions during startup
parallely (#4024)
a21428251c is described below
commit a21428251cdbe265cfe2b5c6398c1cae23a69b7f
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Aug 22 02:32:11 2024 +0530
[GOBBLIN-2131] process all dag actions during startup parallely (#4024)
* process all dag actions during startup parallely
* addressed PR comments
---
.../gobblin/configuration/ConfigurationKeys.java | 3 +++
.../monitoring/DagActionStoreChangeMonitor.java | 24 +++++++++++++++++++---
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b6f37c315f..bc9d6c8047 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -99,6 +99,9 @@ public class ConfigurationKeys {
public static final String MYSQL_DAG_ACTION_STORE_PREFIX =
"MysqlDagActionStore.";
public static final String
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY =
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
public static final long
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 *
60; // (3 days in seconds)
+ // Dag Action Store Change Monitor configuration
+ public static final String
DAG_ACTION_STORE_MONITOR_EXECUTOR_THREADS="dagActionStoreChangeMonitor.executor.numThreads";
+ public static final String
DAG_ACTION_STORE_MONITOR_EXECUTOR_TIMEOUT_SECONDS="dagActionStoreChangeMonitor.executor.timeout.seconds";
// Scheduler lease determination store configuration
public static final String SCHEDULER_LEASE_ARBITER_NAME =
"SchedulerFlowLaunchLeaseArbiter";
public static final String PROCESSING_LEASE_ARBITER_NAME =
"DagActionProcessingLeaseArbiter";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index bb95d2481f..0a618ecbd8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -22,6 +22,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
@@ -29,9 +31,11 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.base.Optional;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
+
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
@@ -52,6 +56,8 @@ import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
/**
@@ -76,7 +82,6 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
protected ContextAwareMeter heartbeatMessagesMeter;
protected ContextAwareMeter nullDagActionTypeMessagesMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
-
protected volatile Long produceToConsumeDelayValue = -1L;
protected LaunchSubmissionMetricProxy ON_STARTUP = new
NullLaunchSubmissionMetricProxy();
@@ -146,15 +151,28 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagAc
throw new RuntimeException(String.format("Unable to retrieve dagActions
from the dagActionStore while "
+ "initializing the %s",
DagActionStoreChangeMonitor.class.getCanonicalName()), e);
}
- // TODO: make this multi-threaded to add parallelism
+
+ final ExecutorService executorService =
Executors.newFixedThreadPool(ConfigUtils.getInt(this.config,
ConfigurationKeys.DAG_ACTION_STORE_MONITOR_EXECUTOR_THREADS, 5),
ExecutorsUtils.newThreadFactory(Optional.of(log)));
+
for (DagActionStore.DagAction action : dagActions) {
try {
- handleDagAction(action, true);
+ executorService.submit(() -> handleDagAction(action, true));
} catch (Exception e) {
log.error("Unexpected error initializing from DagActionStore changes,
upon {}", action, e);
this.unexpectedErrors.mark();
}
}
+ try {
+ boolean executedSuccessfully =
executorService.awaitTermination(ConfigUtils.getInt(this.config,
ConfigurationKeys.DAG_ACTION_STORE_MONITOR_EXECUTOR_TIMEOUT_SECONDS, 30),
TimeUnit.SECONDS);
+
+ if (!executedSuccessfully) {
+ log.error("Executor terminated before processing all actions during
startup,consider increasing the timeOut during awaitTermination");
+ this.unexpectedErrors.mark();
+ }
+ } catch (InterruptedException ignored) {
+ log.error("Interrupted Exception in processing dag actions during
startup,ignoring", ignored);
+ this.unexpectedErrors.mark();
+ }
}
/*