[
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933266&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933266
]
ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Sep/24 23:00
Start Date: 04/Sep/24 23:00
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744527436
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1071,13 +1071,13 @@ public class ConfigurationKeys {
* Configuration properties related to Flows
*/
public static final String FLOW_RUN_IMMEDIATELY = "flow.runImmediately";
- public static final String GOBBLIN_FLOW_SLA_TIME = "gobblin.flow.sla.time";
- public static final String GOBBLIN_FLOW_SLA_TIME_UNIT =
"gobblin.flow.sla.timeunit";
- public static final String DEFAULT_GOBBLIN_FLOW_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
- public static final String GOBBLIN_JOB_START_SLA_TIME =
"gobblin.job.start.sla.time";
- public static final String GOBBLIN_JOB_START_SLA_TIME_UNIT =
"gobblin.job.start.sla.timeunit";
- public static final long FALLBACK_GOBBLIN_JOB_START_SLA_TIME = 10L;
- public static final String FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT =
TimeUnit.MINUTES.name();
+ public static final String GOBBLIN_FLOW_FINSIH_DEADLINE_TIME =
"gobblin.flow.deadline.time";
+ public static final String GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT =
"gobblin.flow.deadline.timeunit";
Review Comment:
looks like you renamed the variable name, but not the property (to
`gobblin.flow.finish.deadline.*`)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -678,8 +457,8 @@ public static void main(String[] args) throws Exception {
@SuppressWarnings("DLS_DEAD_LOCAL_STORE")
private static void testGobblinService(GobblinServiceManager
gobblinServiceManager) {
- FlowConfigClient client =
- new FlowConfigClient(String.format("http://localhost:%s/",
gobblinServiceManager.restliServer.getPort()));
+ try (FlowConfigV2Client client =
Review Comment:
is the indentation off here - and also in the
```
catch (RemoteInvocationException | IOException e) {
```
?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -1083,110 +281,46 @@ public void testQuotasRetryFlow() throws
URISyntaxException, IOException {
.thenReturn(jobStatusIteratorFlow1_0)
.thenReturn(jobStatusIteratorFlow1_1)
.thenReturn(jobStatusIteratorFlow1_2);
- // Dag1 is running
- this._dagManagerThread.run();
+
SortedMap<String, Counter> allCounters =
metricContext.getParent().get().getCounters();
Assert.assertEquals(allCounters.get(MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS,
"user")).getCount(), 1);
- // Dag1 fails and is orchestrated again
- this._dagManagerThread.run();
- // Dag1 is running again
- this._dagManagerThread.run();
+
Assert.assertEquals(allCounters.get(MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS,
"user")).getCount(), 1);
- // Dag1 is marked as complete, should be able to run the next Dag without
hitting the quota limit
- this._dagManagerThread.run();
- this.queue.offer(dagList.get(1));
- this._dagManagerThread.run();
- this._dagManagerThread.run(); // cleanup
Assert.assertEquals(allCounters.get(MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS,
"user")).getCount(), 0);
- Assert.assertEquals(this.dags.size(), 0);
- Assert.assertEquals(this.jobToDag.size(), 0);
- Assert.assertEquals(this.dagToJobs.size(), 0);
}
- @Test (dependsOnMethods = "testQuotasRetryFlow")
- public void testEmitFlowMetricOnlyIfNotAdhoc() throws URISyntaxException,
IOException {
-
- Long flowId = System.currentTimeMillis();
- Dag<JobExecutionPlan> adhocDag = buildDag(String.valueOf(flowId), flowId,
"FINISH_RUNNING", 1, "proxyUser",
-
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
false).build()); //Add a dag to the queue of dags
- this.queue.offer(adhocDag);
-
- Iterator<JobStatus> jobStatusIteratorFlow0_0 =
- getMockJobStatus("flow" + flowId, "group" + flowId, flowId, "job0",
"group0", String.valueOf(ExecutionStatus.COMPLETE));
- Iterator<JobStatus> jobStatusIteratorFlow0_1 =
- getMockFlowStatus("flow" + flowId, "group" + flowId, flowId,
String.valueOf(ExecutionStatus.COMPLETE));
- Iterator<JobStatus> jobStatusIteratorFlow1_0 =
- getMockJobStatus("flow" + flowId+1, "group" + flowId+1, flowId+1,
"job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
- Iterator<JobStatus> jobStatusIteratorFlow1_1 =
- getMockFlowStatus("flow" + flowId+1, "group" + flowId+1, flowId+1,
String.valueOf(ExecutionStatus.COMPLETE));
-
- Mockito.when(_jobStatusRetriever
- .getJobStatusesForFlowExecution(Mockito.eq("flow" + flowId),
Mockito.eq("group" + flowId), Mockito.anyLong(),
- Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIteratorFlow0_0)
- .thenReturn(jobStatusIteratorFlow0_1);
- Mockito.when(_jobStatusRetriever
- .getJobStatusesForFlowExecution(Mockito.eq("flow" + (flowId+1)),
Mockito.eq("group" + (flowId+1)), Mockito.anyLong(),
- Mockito.anyString(), Mockito.anyString()))
- .thenReturn(jobStatusIteratorFlow1_0)
- .thenReturn(jobStatusIteratorFlow1_1);
-
- String flowStateGaugeName0 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group"+flowId,
- "flow"+flowId, ServiceMetricNames.RUNNING_STATUS);
-
Assert.assertNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName0));
-
- Dag<JobExecutionPlan> scheduledDag = buildDag(String.valueOf(flowId+1),
flowId+1, "FINISH_RUNNING", 1, "proxyUser",
-
ConfigBuilder.create().addPrimitive(ConfigurationKeys.GOBBLIN_OUTPUT_JOB_LEVEL_METRICS,
true).build());
- this.queue.offer(scheduledDag);
- this._dagManagerThread.run();
- String flowStateGaugeName1 =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
"group"+(flowId+1),
- "flow"+(flowId+1), ServiceMetricNames.RUNNING_STATUS);
-
-
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(flowStateGaugeName1));
-
- // cleanup
- this._dagManagerThread.run();
- // should be successful since it should be cleaned up with status complete
-
Assert.assertEquals(metricContext.getParent().get().getGauges().get(flowStateGaugeName1).getValue(),
DagManager.FlowState.SUCCESSFUL.value);
- Assert.assertEquals(this.dags.size(), 0);
- Assert.assertEquals(this.jobToDag.size(), 0);
- Assert.assertEquals(this.dagToJobs.size(), 0);
- }
-
- @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc")
- public void testJobSlaKilledMetrics() throws URISyntaxException, IOException
{
+ @Test (dependsOnMethods = "testEmitFlowMetricOnlyIfNotAdhoc", enabled =
false)
+ // todo re-write for dag proc
+ public void testJobSlaKilledMetrics() throws URISyntaxException {
Review Comment:
`testJobStartDeadlineKilledMetrics()`?
##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -203,12 +175,11 @@ public class ServiceConfigKeys {
public static final int DEFAULT_MEMORY_ISSUE_REPO_MAX_ISSUE_PER_CONTEXT= 20;
public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX +
"issueRepo.class";
+ public static final String QUOTA_MANAGER_PREFIX = "UserQuotaManagerPrefix.";
public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
- public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
- public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED =
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+ public static final long DEFAULT_FLOW_DEADLINE_MILLIS =
TimeUnit.HOURS.toMillis(24);
Review Comment:
`DEFAULT_FLOW_FINISH_...`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -433,79 +310,22 @@ private void configureServices(){
registerServicesInLauncher();
- // Register Scheduler to listen to changes in Flows
- // In warm standby mode, instead of scheduler we will add orchestrator as
listener
- if(configuration.isWarmStandbyEnabled()) {
- this.flowCatalog.addListener(this.orchestrator);
- } else if (configuration.isSchedulerEnabled()) {
- this.flowCatalog.addListener(this.scheduler);
- }
- }
-
- private void ensureInjected() {
- if (v2ResourceHandler == null) {
- throw new IllegalStateException("GobblinServiceManager should be
constructed through Guice dependency injection "
- + "or through a static factory method");
- }
+ // Register orchestrator to listen to changes in Flows
+ this.flowCatalog.addListener(this.orchestrator);
}
@Override
public void start() throws ApplicationException {
LOGGER.info("[Init] Starting the Gobblin Service Manager");
- ensureInjected();
-
configureServices();
-
- if (this.helixManager.isPresent()) {
- connectHelixManager();
- }
-
- this.eventBus.register(this);
this.serviceLauncher.start();
- if (this.helixManager.isPresent()) {
- // Subscribe to leadership changes
- this.helixManager.get().addControllerListener((ControllerChangeListener)
this::handleLeadershipChange);
-
-
- // Update for first time since there might be no notification
- if (helixManager.get().isLeader()) {
- if (configuration.isSchedulerEnabled()) {
- LOGGER.info("[Init] Gobblin Service is running in master instance
mode, enabling Scheduler.");
- this.scheduler.setActive(true);
- }
-
- if (configuration.isGitConfigMonitorEnabled()) {
- this.gitConfigMonitor.setActive(true);
- }
-
- if (helixLeaderGauges.isPresent()) {
- helixLeaderGauges.get().setState(LeaderState.MASTER);
- }
+ LOGGER.info("[Init] Gobblin Service is running in master instance mode,
enabling Scheduler.");
Review Comment:
nit: "is running in multi-active mode..."
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -413,16 +292,14 @@ private void registerServicesInLauncher(){
this.serviceLauncher.addService(restliServer);
}
- if (this.configuration.isWarmStandbyEnabled()) {
- this.serviceLauncher.addService(specStoreChangeMonitor);
- this.serviceLauncher.addService(dagActionStoreChangeMonitor);
- }
+ this.serviceLauncher.addService(specStoreChangeMonitor);
+ this.serviceLauncher.addService(_dagManagementDagActionStoreChangeMonitor);
}
private void configureServices(){
if (configuration.isRestLIServerEnabled()) {
this.restliServer = EmbeddedRestliServer.builder()
- .resources(Lists.newArrayList(FlowConfigsResource.class,
FlowConfigsV2Resource.class))
+ .resources(Lists.newArrayList(FlowConfigsV2Resource.class,
FlowConfigsV2Resource.class))
Review Comment:
did you mean to name this same resource twice?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -290,15 +323,15 @@ public void cleanUp() throws Exception {
*/
@Test
public void testGetClass() {
-
Assert.assertTrue(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
instanceof FlowCompilationValidationHelper);
+
Assert.assertNotNull(GobblinServiceManager.getClass(FlowCompilationValidationHelper.class));
// Optionally bound config
- Assert.assertTrue(GobblinServiceManager.getClass(FlowCatalog.class)
instanceof FlowCatalog);
+ Assert.assertNotNull(GobblinServiceManager.getClass(FlowCatalog.class));
}
/**
* To test an existing flow in a spec store does not get deleted just
because it is not compilable during service restarts
*/
- @Test
+ @Test (enabled = false)
Review Comment:
please add a comment on why disabling
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java:
##########
@@ -943,30 +162,19 @@ public void testDagManagerQuotaExceeded() throws
URISyntaxException, IOException
.thenReturn(jobStatusIteratorFlow1_0)
.thenReturn(jobStatusIteratorFlow1_1);
- this._dagManagerThread.run();
// dag will not be processed due to exceeding the quota, will log a
message and exit out without adding it to dags
- this.queue.offer(dagList.get(1));
- this._dagManagerThread.run();
SortedMap<String, Counter> allCounters =
metricContext.getParent().get().getCounters();
Assert.assertEquals(allCounters.get(MetricRegistry.name(
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.SERVICE_USERS,
"user")).getCount(), 1);
-
- this._dagManagerThread.run(); // cleanup
-
- Assert.assertEquals(this.dags.size(), 0);
- Assert.assertEquals(this.jobToDag.size(), 0);
- Assert.assertEquals(this.dagToJobs.size(), 0);
}
- @Test (dependsOnMethods = "testDagManagerQuotaExceeded")
- public void testQuotaDecrement() throws URISyntaxException, IOException {
-
- List<Dag<JobExecutionPlan>> dagList = buildDagList(3, "user",
ConfigFactory.empty());
+ @Test (dependsOnMethods = "testDagManagerQuotaExceeded", enabled = false)
+ // todo re-write for dag proc
+ public void testQuotaDecrement() throws URISyntaxException {
+ List<Dag<JobExecutionPlan>> dagList = DagTestUtils.buildDagList(3, "user",
ConfigFactory.empty());
Review Comment:
1. do these tests for the quota manager still belong in the `DagManagerTest`
class?
2. is there a more direct way to write these tests? I'm not seeing which
method of the quota mgr is invoked. also the test assertions are based upon
counters.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -534,17 +354,9 @@ public void start() throws ApplicationException {
//Activate the SpecCompiler, after the topologyCatalog has been
initialized.
this.orchestrator.getSpecCompiler().setActive(true);
- //Activate the DagManager service, after the topologyCatalog has been
initialized.
- if (!this.configuration.isDagProcessingEngineEnabled() &&
(!this.helixManager.isPresent() || this.helixManager.get().isLeader())){
- this.dagManager.setActive(true);
- this.eventBus.register(this.dagManager);
- }
-
- // Activate both monitors last as they're dependent on the SpecCompiler,
Scheduler, and DagManager being active
- if (configuration.isWarmStandbyEnabled()) {
- this.specStoreChangeMonitor.setActive();
- this.dagActionStoreChangeMonitor.setActive();
- }
+ // Activate both monitors last as they're dependent on the SpecCompiler
and Scheduler being active
+ this.specStoreChangeMonitor.setActive();
+ this._dagManagementDagActionStoreChangeMonitor.setActive();
Review Comment:
nit: why does the second one have a leading `_` in its name?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -156,14 +143,14 @@ public DagTask next() {
private void createJobStartDeadlineTrigger(DagActionStore.LeaseParams
leaseParams)
throws SchedulerException, IOException {
- long timeOutForJobStart =
DagManagerUtils.getJobStartSla(this.dagManagementStateStore.getDag(
- leaseParams.getDagAction().getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ long timeOutForJobStart =
DagUtils.getJobStartDeadline(this.dagManagementStateStore.getDag(
+ leaseParams.getDagAction().getDagId()).get().getNodes().get(0),
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
Review Comment:
should this handle "invalid format" / `ConfigException`, like is done for
the flow finish deadline (below)?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java:
##########
@@ -609,15 +388,15 @@ public Collection<StandardMetrics>
getStandardMetricsCollection() {
return ImmutableList.of(this.metrics);
}
- private class Metrics extends StandardMetrics {
+ private static class Metrics extends StandardMetrics {
public static final String SERVICE_LEADERSHIP_CHANGE =
"serviceLeadershipChange";
- private ContextAwareHistogram serviceLeadershipChange;
public Metrics(final MetricContext metricContext, Config config) {
int timeWindowSizeInMinutes = ConfigUtils.getInt(config,
ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES,
ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES);
- this.serviceLeadershipChange =
metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE,
timeWindowSizeInMinutes, TimeUnit.MINUTES);
- this.contextAwareMetrics.add(this.serviceLeadershipChange);
+ ContextAwareHistogram serviceLeadershipChange =
+ metricContext.contextAwareHistogram(SERVICE_LEADERSHIP_CHANGE,
timeWindowSizeInMinutes, TimeUnit.MINUTES);
+ this.contextAwareMetrics.add(serviceLeadershipChange);
Review Comment:
does this metric still make sense?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -215,61 +182,15 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup,
flowName);
- /* Only compile and pass directly to `DagManager` when multi-active
scheduler NOT enabled; otherwise
- recompilation to occur later, once `DagActionStoreChangeMonitor`
subsequently delegates this
- `DagActionType.LAUNCH`
- */
- if (flowLaunchHandler.isPresent()) {
- DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(
- flowGroup,
- flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
- DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams
- leaseObject = new DagActionStore.LeaseParams(launchDagAction,
isReminderEvent,
- triggerTimestampMillis);
- // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
- flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps,
leaseObject, flowSpec.isScheduled());
- _log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
- launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
- } else {
- try {
- TimingEvent flowCompilationTimer =
- new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
- flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
-
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
- } finally {
- /* Remove adhoc flow spec from the flow catalog, regardless of
whether the flow was successfully validated
- and permitted to exec (concurrently)
- */
- this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
- }
- }
+ DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+ FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams
+ leaseObject = new DagActionStore.LeaseParams(launchDagAction,
isReminderEvent,
+ triggerTimestampMillis);
Review Comment:
nit: don't leave the type on a line by itself, but also put the
`leaseObject` variable name there too
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -85,64 +77,39 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
protected final Logger _log;
protected final SpecCompiler specCompiler;
protected final TopologyCatalog topologyCatalog;
- protected final DagManager dagManager;
private final JobStatusRetriever jobStatusRetriever;
protected final MetricContext metricContext;
protected final EventSubmitter eventSubmitter;
- private final boolean isFlowConcurrencyEnabled;
@Getter
private Meter flowOrchestrationSuccessFulMeter;
@Getter
private Meter flowOrchestrationFailedMeter;
@Getter
private Timer flowOrchestrationTimer;
- private Counter flowFailedForwardToDagManagerCounter;
- @Setter
- private FlowStatusGenerator flowStatusGenerator;
-
- private UserQuotaManager quotaManager;
- private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
- private Optional<FlowLaunchHandler> flowLaunchHandler;
- private Optional<FlowCatalog> flowCatalog;
+ private final FlowLaunchHandler flowLaunchHandler;
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@Inject
- public Orchestrator(Config config, TopologyCatalog topologyCatalog,
DagManager dagManager,
- Optional<Logger> log, FlowStatusGenerator flowStatusGenerator,
Optional<FlowLaunchHandler> flowLaunchHandler,
- SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog,
- Optional<DagManagementStateStore> dagManagementStateStore,
+ public Orchestrator(Config config, TopologyCatalog topologyCatalog,
Optional<Logger> log, FlowLaunchHandler flowLaunchHandler,
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
DagManagementStateStore dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper,
JobStatusRetriever jobStatusRetriever) throws IOException {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.topologyCatalog = topologyCatalog;
- this.dagManager = dagManager;
- this.flowStatusGenerator = flowStatusGenerator;
this.flowLaunchHandler = flowLaunchHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
- this.flowCatalog = flowCatalog;
this.jobStatusRetriever = jobStatusRetriever;
- this.flowCompilationValidationHelper = flowCompilationValidationHelper;
this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
- //At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManager.
- this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
- if (dagManagementStateStore.isPresent()) {
- ((MySqlDagManagementStateStore)
dagManagementStateStore.get()).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
- }
+ //At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManagementStateStore.
+ ((MySqlDagManagementStateStore)
dagManagementStateStore).setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
Review Comment:
seeing this cast makes me wonder:
a. does `setTopologySpecMap` belong in the `DagManagementStateStore`
interface?
b. OR, perhaps even better, should it be a ctor param to the
`MysqlDagManagementStateStore`? I realize there are timing interdependencies
to coordinate, but couldn't those be handled before
`MysqlDagManagementStateStore::start` is to be called?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,15 @@
/**
- * Helper class with functionality meant to be re-used between the DagManager
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the
LaunchDagProc and Orchestrator when launching
* executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateful,
- * requiring all stateful pieces to be passed as input from the caller upon
instantiating the helper.
- * Note: We expect further refactoring to be done to the DagManager in later
stage of multi-active development, so we do
- * not attempt major reorganization as abstractions may change.
+ * validations, and creates {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}s
which the
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine} is
responsible for processing.
+ * However, with launch executions now being stored in the
DagActionStateStore, on restart, the LaunchDagProc has to
+ * perform validations before executing any launch actions the previous
LaunchDagProc was unable to complete.
+ * Rather than duplicating the code or introducing a circular dependency
between the LaunchDagProc and Orchestrator,
+ * this class is utilized to store the common functionality. It is stateful,
requiring all stateful pieces to be passed
+ * as input from the caller upon instantiating the helper.
Review Comment:
actually, should it be, "It is **stateless**, requiring all stateful pieces
to be passed as ..."?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -344,10 +309,10 @@ public void createFlowSpec() throws Throwable {
}
// Make sure FlowCatalog has the added Flow
- Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec
after addition");
+ Assert.assertEquals(specs.size(), 1, "Spec store should contain 1 Spec
after addition");
// Orchestrator is a no-op listener for any new FlowSpecs
-
Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() ==
0, "SpecProducer should contain 0 "
- + "Spec after addition");
+ Assert.assertEquals(sei.getProducer().get().listSpecs().get().size(), 0,
+ "SpecProducer should contain 0 " + "Spec after addition");
Review Comment:
nit: they're all on the same line, so no need to append the two halves w/ `+`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -158,7 +138,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
* e.g. There are multi-datacenter deployment of GaaS Cluster.
Intra-datacenter fail-over could be handled by
* leadership change mechanism, while inter-datacenter fail-over would be
handled by DR handling mechanism.
*/
- private boolean isNominatedDRHandler;
+ private final boolean isNominatedDRHandler;
Review Comment:
I'm not familiar w/ multi-datacenter deployment... does that play well w/
multi-active?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -18,58 +18,232 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.jetbrains.annotations.NotNull;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
/**
- * A {@link DagActionStoreChangeMonitor} that should be used {@link
org.apache.gobblin.service.ServiceConfigKeys#DAG_PROCESSING_ENGINE_ENABLED}
- * is set.
+ * A DagActionStore change monitor that uses {@link DagActionStoreChangeEvent}
schema to process Kafka messages received
+ * from its corresponding consumer client. This monitor responds to requests
to resume or delete a flow and acts as a
+ * connector between the API and execution layers of GaaS.
*/
@Slf4j
-public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChangeMonitor {
- private final DagManagement dagManagement;
- @VisibleForTesting @Getter
- private final DagActionReminderScheduler dagActionReminderScheduler;
+public class DagManagementDagActionStoreChangeMonitor extends
HighLevelConsumer<String, DagActionStoreChangeEvent> {
Review Comment:
my impression from our discussion
[here](https://github.com/apache/gobblin/pull/4031#discussion_r1737677287) was
to defer on refactoring this class any more than necessary (e.g. to leave it
with its same base class for now)
Issue Time Tracking
-------------------
Worklog Id: (was: 933266)
Time Spent: 3h 40m (was: 3.5h)
> remove obsolete code related to DagManager
> ------------------------------------------
>
> Key: GOBBLIN-2136
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 3h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)