This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7d95f70e2 [GOBBLIN-2061] Fix initialization for DagProcessingEngine
(#3943)
7d95f70e2 is described below
commit 7d95f70e2dfcd6cd8feccdba4ca168663ff9c0fd
Author: umustafi <[email protected]>
AuthorDate: Mon May 6 10:55:20 2024 -0700
[GOBBLIN-2061] Fix initialization for DagProcessingEngine (#3943)
* Fix initialization for DagProcessingEngine
* Fix compile errors
* Fix dagProcessingEngineTest
* Remove unused import
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/core/GobblinServiceGuiceModule.java | 5 +--
.../modules/core/GobblinServiceManager.java | 10 ++++-
.../orchestration/DagActionReminderScheduler.java | 7 ++--
.../orchestration/DagManagementTaskStreamImpl.java | 14 +++----
.../modules/orchestration/DagProcessingEngine.java | 46 +++++++++++++++++-----
.../modules/orchestration/proc/LaunchDagProc.java | 8 +++-
.../utils/FlowCompilationValidationHelper.java | 2 +
.../DagManagementTaskStreamImplTest.java | 2 +-
.../orchestration/DagProcessingEngineTest.java | 9 +++--
9 files changed, 72 insertions(+), 31 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index a53366547..5f9893aaf 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -212,10 +212,7 @@ public class GobblinServiceGuiceModule implements Module {
annotatedWith(Names.named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME))
.toProvider(
DagActionProcessingMultiActiveLeaseArbiterFactory.class);
- // Multi-active execution is only compatible with dagProcessingEngine
configuration
- if (serviceConfig.isMultiActiveExecutionEnabled()) {
- binder.bind(DagActionReminderScheduler.class);
- }
+ binder.bind(DagActionReminderScheduler.class);
binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 02b969a53..d0bb037b5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -93,6 +93,7 @@ import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
@@ -221,6 +222,9 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
@Inject(optional = true)
protected DagActionStoreChangeMonitor dagActionStoreChangeMonitor;
+ @Inject(optional = true)
+ protected DagProcessingEngine dagProcessingEngine;
+
@Inject
protected GobblinServiceManager(GobblinServiceConfiguration configuration)
throws Exception {
this.configuration = Objects.requireNonNull(configuration);
@@ -382,7 +386,11 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
}
}
- this.serviceLauncher.addService(dagManager);
+ if (configuration.isDagProcessingEngineEnabled()) {
+ this.serviceLauncher.addService(dagProcessingEngine);
+ } {
+ this.serviceLauncher.addService(dagManager);
+ }
this.serviceLauncher.addService(databaseManager);
this.serviceLauncher.addService(issueRepository);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index 9334f796b..909d19f37 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -51,8 +51,8 @@ public class DagActionReminderScheduler {
@Inject
public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
throws SchedulerException {
- // Create a new Scheduler to be used solely for the DagProc reminders
- this.quartzScheduler =
schedulerFactory.getScheduler(DAG_ACTION_REMINDER_SCHEDULER_KEY);
+ // Creates a new Scheduler to be used solely for the DagProc reminders
+ this.quartzScheduler = schedulerFactory.getScheduler();
}
/**
@@ -91,8 +91,7 @@ public class DagActionReminderScheduler {
String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- DagActionStore.DagActionType dagActionType =
DagActionStore.DagActionType.valueOf(
- jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+ DagActionStore.DagActionType dagActionType =
(DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
+ ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 9aeab6696..e4d34aee6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -58,7 +58,8 @@ import org.apache.gobblin.util.ConfigUtils;
* indication the lease has been completed
* ({@link LeaseAttemptStatus}) then the {@link
MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder for
* the flow action using {@link DagActionReminderScheduler} to reattempt the
lease after the current leaseholder's grant
- * would have expired.
+ * would have expired. The {@link DagActionReminderScheduler} is used in the
non multi-active execution configuration as
+ * well to utilize reminders for a single {@link DagManagementTaskStreamImpl}
case as well.
* Note that if multi-active execution is NOT enabled, then all flow action
events are selected by
* {@link DagManagementTaskStreamImpl#next()} by virtue of having no other
contenders for the lease at the time
* {@link MultiActiveLeaseArbiter#tryAcquireLease} is called.
@@ -90,16 +91,16 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
in {@link GobblinServiceGuiceModule} which handles all possible
configurations */
throw new RuntimeException("DagProcessingEngine should not be enabled
without dagActionStore enabled.");
}
+ if (!dagActionReminderScheduler.isPresent()) {
+ throw new RuntimeException(String.format("DagProcessingEngine requires
%s to be instantiated.",
+ DagActionReminderScheduler.class.getSimpleName()));
+ }
this.dagActionStore = dagActionStore;
this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
this.dagActionReminderScheduler = dagActionReminderScheduler;
this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
- if (this.isMultiActiveExecutionEnabled &&
!this.dagActionReminderScheduler.isPresent()) {
- throw new RuntimeException(String.format("Multi-active execution enabled
but required "
- + "instance %s is absent.",
DagActionReminderScheduler.class.getSimpleName()));
- }
}
@Override
@@ -144,9 +145,8 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
*/
private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction
dagAction)
throws IOException, SchedulerException {
- LeaseAttemptStatus leaseAttemptStatus;
// TODO: need to handle reminder events and flag them
- leaseAttemptStatus = this.dagActionProcessingLeaseArbiter
+ LeaseAttemptStatus leaseAttemptStatus =
this.dagActionProcessingLeaseArbiter
.tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
/* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where even
we, when we might become the lease owner still fail to complete
processing
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
index cbcfa3704..464abdea1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java
@@ -20,8 +20,10 @@ package org.apache.gobblin.service.modules.orchestration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
@@ -47,44 +49,72 @@ import org.apache.gobblin.util.ExecutorsUtils;
* encapsulates all processing inside {@link
DagProc#process(DagManagementStateStore)}
*/
+@AllArgsConstructor
@Alpha
@Slf4j
@Singleton
-public class DagProcessingEngine {
+public class DagProcessingEngine extends AbstractIdleService {
@Getter private final Optional<DagTaskStream> dagTaskStream;
@Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ private final Config config;
+ private final Optional<DagProcFactory> dagProcFactory;
+ private ScheduledExecutorService scheduledExecutorPool;
+ private static final Integer TERMINATION_TIMEOUT = 30;
@Inject
public DagProcessingEngine(Config config, Optional<DagTaskStream>
dagTaskStream,
Optional<DagProcFactory> dagProcFactory,
Optional<DagManagementStateStore> dagManagementStateStore) {
+ this.config = config;
+ this.dagProcFactory = dagProcFactory;
+ this.dagTaskStream = dagTaskStream;
+ this.dagManagementStateStore = dagManagementStateStore;
+ if (!dagTaskStream.isPresent() || !dagProcFactory.isPresent() ||
!dagManagementStateStore.isPresent()) {
+ throw new RuntimeException(String.format("DagProcessingEngine cannot be
initialized without all of the following"
+ + "classes present. DagTaskStream is %s, DagProcFactory is %s,
DagManagementStateStore is %s",
+ this.dagTaskStream.isPresent() ? "present" : "MISSING",
+ this.dagProcFactory.isPresent() ? "present" : "MISSING",
+ this.dagManagementStateStore.isPresent() ? "present" : "MISSING"));
+ }
+ log.info("DagProcessingEngine initialized.");
+ }
+
+ @Override
+ protected void startUp() {
Integer numThreads = ConfigUtils.getInt
(config, ServiceConfigKeys.NUM_DAG_PROC_THREADS_KEY,
ServiceConfigKeys.DEFAULT_NUM_DAG_PROC_THREADS);
- ScheduledExecutorService scheduledExecutorPool =
+ this.scheduledExecutorPool =
Executors.newScheduledThreadPool(numThreads,
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
com.google.common.base.Optional.of("DagProcessingEngineThread")));
- this.dagTaskStream = dagTaskStream;
- this.dagManagementStateStore = dagManagementStateStore;
-
for (int i=0; i < numThreads; i++) {
// todo - set metrics for count of active DagProcEngineThread
DagProcEngineThread dagProcEngineThread = new
DagProcEngineThread(dagTaskStream.get(), dagProcFactory.get(),
- dagManagementStateStore.get());
- scheduledExecutorPool.submit(dagProcEngineThread);
+ dagManagementStateStore.get(), i);
+ this.scheduledExecutorPool.submit(dagProcEngineThread);
}
}
+ @Override
+ protected void shutDown()
+ throws Exception {
+ log.info("DagProcessingEngine shutting down.");
+ this.scheduledExecutorPool.shutdown();
+ this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT,
TimeUnit.SECONDS);
+ }
+
@AllArgsConstructor
@VisibleForTesting
static class DagProcEngineThread implements Runnable {
private DagTaskStream dagTaskStream;
private DagProcFactory dagProcFactory;
private DagManagementStateStore dagManagementStateStore;
+ private final int threadID;
@Override
public void run() {
while (true) {
+ log.info("Starting DagProcEngineThread to process dag tasks. Thread
id: {}", threadID);
DagTask dagTask = dagTaskStream.next(); // blocking call
if (dagTask == null) {
//todo - add a metrics to count the times dagTask was null
@@ -100,8 +130,6 @@ public class DagProcessingEngine {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
- // todo mark lease success and releases it
- //dagTaskStream.complete(dagTask);
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 482cb563d..b52eb3435 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -63,8 +63,12 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
try {
FlowSpec flowSpec =
dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(getDagId().getFlowId()));
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
getDagId().getFlowExecutionId());
- return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
- } catch (URISyntaxException | SpecNotFoundException | InterruptedException
e) {
+ Optional<Dag<JobExecutionPlan>> dag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
+ if (dag.isPresent()) {
+ dagManagementStateStore.checkpointDag(dag.get());
+ }
+ return dag;
+ } catch (URISyntaxException | SpecNotFoundException | InterruptedException
| IOException e) {
throw new RuntimeException(e);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 301e3e436..af90d5b7b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.reflect.ConstructorUtils;
import com.google.common.base.Optional;
import com.google.inject.Inject;
+import com.google.inject.Singleton;
import com.typesafe.config.Config;
import lombok.Data;
@@ -62,6 +63,7 @@ import org.apache.gobblin.util.ConfigUtils;
*/
@Slf4j
@Data
+@Singleton
public class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@Getter
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index ff75d906a..ec9377cec 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -82,7 +82,7 @@ public class DagManagementTaskStreamImplTest {
false);
this.dagProcFactory = new DagProcFactory(null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
- this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
+ this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore, 0);
}
/* This tests adding and removal of dag actions from dag task stream with a
launch task. It verifies that the
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 077c5dd90..3f967dbd6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -91,16 +91,17 @@ public class DagProcessingEngineTest {
this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
- mock(MultiActiveLeaseArbiter.class), Optional.empty(), false);
+ mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)), false);
this.dagProcFactory = new DagProcFactory(null);
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
new
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream,
this.dagProcFactory,
- dagManagementStateStore);
+ dagManagementStateStore, 0);
this.dagTaskStream = spy(new MockedDagTaskStream());
DagProcessingEngine dagProcessingEngine =
new DagProcessingEngine(config, Optional.ofNullable(dagTaskStream),
Optional.ofNullable(this.dagProcFactory),
Optional.ofNullable(dagManagementStateStore));
+ dagProcessingEngine.startAsync();
}
static class MockedDagTaskStream implements DagTaskStream {
@@ -171,6 +172,7 @@ public class DagProcessingEngineTest {
@Test
public void dagProcessingTest()
throws InterruptedException, TimeoutException, IOException {
+
// there are MAX_NUM_OF_TASKS dag tasks returned and then each thread
additionally call (infinitely) once to wait
// in this unit tests, it does not infinitely wait though, because the
mocked task stream throws an exception on
// (MAX_NUM_OF_TASKS + 1) th call
@@ -178,7 +180,8 @@ public class DagProcessingEngineTest {
int expectedExceptions = MockedDagTaskStream.MAX_NUM_OF_TASKS /
MockedDagTaskStream.FAILING_DAGS_FREQUENCY;
AssertWithBackoff.assertTrue(input ->
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size() ==
expectedNumOfInvocations,
- 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + "
number of times",
+ 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + "
number of times. "
+ + "Actual number of invocations " +
Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);
Assert.assertEquals(this.dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(),
expectedExceptions);