This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new be12fd043 [GOBBLIN-1983] Remove Optionals to make DagManager,
EventSubmitter, and TopologyCatalog required for GaaS operation (#3855)
be12fd043 is described below
commit be12fd043a129e4b6c6445cbe98afb050c69c62c
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Jan 12 00:34:14 2024 -0800
[GOBBLIN-1983] Remove Optionals to make DagManager, EventSubmitter, and
TopologyCatalog required for GaaS operation (#3855)
Remove Optionals to make DagManager, EventSubmitter, and TopologyCatalog
required for GaaS operation
---
.../apache/gobblin/service/ServiceConfigKeys.java | 3 -
.../apache/gobblin/instrumented/Instrumented.java | 12 +-
.../gobblin/metrics/event/EventSubmitter.java | 2 +-
.../org/apache/gobblin/service/FlowStatusTest.java | 2 +-
.../service/monitoring/FlowStatusGenerator.java | 2 +-
.../service/monitoring/JobStatusRetriever.java | 37 ++---
.../monitoring/FlowStatusGeneratorTest.java | 11 +-
.../modules/core/GobblinServiceConfiguration.java | 10 --
.../modules/core/GobblinServiceGuiceModule.java | 35 ++---
.../modules/core/GobblinServiceManager.java | 41 +++---
.../service/modules/orchestration/DagManager.java | 125 +++++++---------
.../modules/orchestration/DagManagerUtils.java | 7 +-
.../modules/orchestration/Orchestrator.java | 158 ++++++---------------
.../scheduler/GobblinServiceJobScheduler.java | 54 +++----
.../utils/FlowCompilationValidationHelper.java | 26 ++--
.../service/monitoring/FsJobStatusRetriever.java | 5 +-
.../monitoring/LocalFsJobStatusRetriever.java | 3 +-
.../monitoring/MysqlJobStatusRetriever.java | 5 +-
.../gobblin/service/GobblinServiceManagerTest.java | 34 +++--
.../service/modules/core/GobblinServiceHATest.java | 11 +-
.../modules/core/GobblinServiceRedirectTest.java | 10 +-
.../modules/orchestration/DagManagerFlowTest.java | 6 +-
.../modules/orchestration/DagManagerTest.java | 2 +-
.../modules/orchestration/OrchestratorTest.java | 45 +++---
.../scheduler/GobblinServiceJobSchedulerTest.java | 21 ++-
.../monitoring/FsJobStatusRetrieverTest.java | 19 ++-
.../FsJobStatusRetrieverTestWithoutDagManager.java | 116 ---------------
.../service/monitoring/JobStatusRetrieverTest.java | 2 +-
.../monitoring/MysqlJobStatusRetrieverTest.java | 15 +-
...sqlJobStatusRetrieverTestWithoutDagManager.java | 128 -----------------
30 files changed, 286 insertions(+), 661 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 21b32b58c..2cfb6a017 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -29,7 +29,6 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_ORCHESTRATOR_LISTENER_CLASS =
"org.apache.gobblin.service.modules.orchestration.Orchestrator";
// Gobblin Service Manager Keys
- public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
public static final String GOBBLIN_SERVICE_INSTANCE_NAME =
GOBBLIN_SERVICE_PREFIX + "instance.name";
@@ -37,8 +36,6 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY
= GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
- public static final String GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "dagManager.enabled";
- public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED =
false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
public static final String
GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX +
"multiActiveScheduler.enabled";
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
index f2aa10303..33f4b56a8 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/Instrumented.java
@@ -25,8 +25,6 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.Meter;
@@ -41,6 +39,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
+import javax.annotation.Nonnull;
+
import org.apache.gobblin.Constructs;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -247,6 +247,10 @@ public class Instrumented implements Instrumentable,
Closeable {
});
}
+ public static void updateTimer(Timer timer, final long duration, final
TimeUnit unit) {
+ updateTimer(Optional.of(timer), duration, unit);
+ }
+
/**
* Marks a meter only if it is defined.
* @param meter an Optional<{@link com.codahale.metrics.Meter}>
@@ -255,6 +259,10 @@ public class Instrumented implements Instrumentable,
Closeable {
markMeter(meter, 1);
}
+ public static void markMeter(Meter meter) {
+ markMeter(Optional.of(meter), 1);
+ }
+
/**
* Marks a meter only if it is defined.
* @param meter an Optional<{@link com.codahale.metrics.Meter}>
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 891f980b7..480542560 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -179,7 +179,7 @@ public class EventSubmitter {
}
// Timestamp is set by metric context.
- this.metricContext.get().submitEvent(new GobblinTrackingEvent(0l,
this.namespace, name, finalMetadata));
+ this.metricContext.get().submitEvent(new GobblinTrackingEvent(0L,
this.namespace, name, finalMetadata));
}
}
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index c507b0310..4888a2c23 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -54,7 +54,7 @@ public class FlowStatusTest {
class TestJobStatusRetriever extends JobStatusRetriever {
protected TestJobStatusRetriever(MultiContextIssueRepository
issueRepository) {
- super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED,
issueRepository);
+ super(issueRepository);
}
@Override
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 8b991b985..0bf8b71ea 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -118,7 +118,7 @@ public class FlowStatusGenerator {
List<JobStatus> jobStatuses =
ImmutableList.copyOf(retainStatusOfAnyFlowOrJobMatchingTag(
jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup,
flowExecutionId), tag));
ExecutionStatus flowExecutionStatus =
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(),
jobStatuses.iterator());
+
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator());
return jobStatuses.iterator().hasNext()
? new FlowStatus(flowName, flowGroup, flowExecutionId,
jobStatuses.iterator(), flowExecutionStatus) : null;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
index e5845fe37..750834092 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
@@ -19,12 +19,10 @@ package org.apache.gobblin.service.monitoring;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.base.Supplier;
@@ -60,15 +58,12 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
@Getter
protected final MetricContext metricContext;
- @Getter
- protected final Boolean dagManagerEnabled;
private final MultiContextIssueRepository issueRepository;
- protected JobStatusRetriever(boolean dagManagerEnabled,
MultiContextIssueRepository issueRepository) {
+ protected JobStatusRetriever(MultiContextIssueRepository issueRepository) {
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.issueRepository = Objects.requireNonNull(issueRepository);
- this.dagManagerEnabled = dagManagerEnabled;
}
public abstract Iterator<JobStatus> getJobStatusesForFlowExecution(String
flowName, String flowGroup,
@@ -186,7 +181,7 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
).collect(Collectors.toList())));
return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(),
exec.getFlowExecutionId(), jobStatuses.iterator(),
- getFlowStatusFromJobStatuses(dagManagerEnabled,
jobStatuses.iterator()));
+ getFlowStatusFromJobStatuses(jobStatuses.iterator()));
}).collect(Collectors.toList());
}
@@ -227,31 +222,15 @@ public abstract class JobStatusRetriever implements
LatestFlowExecutionIdTracker
&& jobStatus.getJobName().equals(JobStatusRetriever.NA_KEY) &&
jobStatus.getJobGroup().equals(JobStatusRetriever.NA_KEY);
}
- public static ExecutionStatus getFlowStatusFromJobStatuses(boolean
dagManagerEnabled, Iterator<JobStatus> jobStatusIterator) {
+ public static ExecutionStatus
getFlowStatusFromJobStatuses(Iterator<JobStatus> jobStatusIterator) {
ExecutionStatus flowExecutionStatus = ExecutionStatus.$UNKNOWN;
- if (dagManagerEnabled) {
- while (jobStatusIterator.hasNext()) {
- JobStatus jobStatus = jobStatusIterator.next();
- // Check if this is the flow status instead of a single job status
- if (JobStatusRetriever.isFlowStatus(jobStatus)) {
- flowExecutionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
- }
- }
- } else {
- Set<ExecutionStatus> jobStatuses = new HashSet<>();
- while (jobStatusIterator.hasNext()) {
- JobStatus jobStatus = jobStatusIterator.next();
- // because in absence of DagManager we do not get all flow level
events, we will ignore the flow level events
- // we actually get and purely calculate flow status based on flow
statuses.
- if (!JobStatusRetriever.isFlowStatus(jobStatus)) {
- jobStatuses.add(ExecutionStatus.valueOf(jobStatus.getEventName()));
- }
+ while (jobStatusIterator.hasNext()) {
+ JobStatus jobStatus = jobStatusIterator.next();
+ // Check if this is the flow status instead of a single job status
+ if (JobStatusRetriever.isFlowStatus(jobStatus)) {
+ flowExecutionStatus =
ExecutionStatus.valueOf(jobStatus.getEventName());
}
-
- List<ExecutionStatus> statusesInDescendingSalience =
ImmutableList.of(ExecutionStatus.FAILED, ExecutionStatus.CANCELLED,
- ExecutionStatus.RUNNING, ExecutionStatus.ORCHESTRATED,
ExecutionStatus.COMPLETE);
- flowExecutionStatus =
statusesInDescendingSalience.stream().filter(jobStatuses::contains).findFirst().orElse(ExecutionStatus.$UNKNOWN);
}
return flowExecutionStatus;
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 1c5c534be..380177936 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -77,7 +77,6 @@ public class FlowStatusGeneratorTest {
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName("RUNNING").build();
jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, jobStatus3,
flowStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName,
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
- when(jobStatusRetriever.getDagManagerEnabled()).thenReturn(true);
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
}
@@ -109,9 +108,9 @@ public class FlowStatusGeneratorTest {
// IMPORTANT: result invariants to honor - ordered by ascending flowName,
all of same flowName adjacent, therein descending flowExecutionId
// NOTE: Three copies of FlowStatus are needed for repeated use, due to
mutable, non-rewinding `Iterator FlowStatus.getJobStatusIterator`
- FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1,
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
- FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1,
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
- FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1,
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2), jobStatusRetriever);
+ FlowStatus flowStatus = createFlowStatus(flowGroup, flowName1,
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
+ FlowStatus flowStatus2 = createFlowStatus(flowGroup, flowName1,
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
+ FlowStatus flowStatus3 = createFlowStatus(flowGroup, flowName1,
flowExecutionId1, Arrays.asList(f1Js0, f1Js1, f1Js2));
Mockito.when(jobStatusRetriever.getFlowStatusesForFlowGroupExecutions("myFlowGroup",
2))
.thenReturn(Collections.singletonList(flowStatus),
Collections.singletonList(flowStatus2),
Collections.singletonList(flowStatus3)); // (for three invocations)
@@ -138,9 +137,9 @@ public class FlowStatusGeneratorTest {
Arrays.asList(f0jsmDep2)));
}
- private FlowStatus createFlowStatus(String flowGroup, String flowName, long
flowExecutionId, List<JobStatus> jobStatuses, JobStatusRetriever
jobStatusRetriever) {
+ private FlowStatus createFlowStatus(String flowGroup, String flowName, long
flowExecutionId, List<JobStatus> jobStatuses) {
return new FlowStatus(flowName, flowGroup, flowExecutionId,
jobStatuses.iterator(),
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getDagManagerEnabled(),
jobStatuses.iterator()));
+
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatuses.iterator()));
}
private JobStatus createFlowJobStatus(String flowGroup, String flowName,
long flowExecutionId, ExecutionStatus status) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 03081b3ba..1998b541d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -46,9 +46,6 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isMultiActiveSchedulerEnabled;
- @Getter
- private final boolean isTopologyCatalogEnabled;
-
@Getter
private final boolean isFlowCatalogEnabled;
@@ -64,9 +61,6 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isGitConfigMonitorEnabled;
- @Getter
- private final boolean isDagManagerEnabled;
-
@Getter
private final boolean isJobStatusMonitorEnabled;
@@ -93,8 +87,6 @@ public class GobblinServiceConfiguration {
this.innerConfig = Objects.requireNonNull(config, "Config cannot be null");
this.serviceWorkDir = serviceWorkDir;
- isTopologyCatalogEnabled =
- ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
isFlowCatalogEnabled =
ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
@@ -113,8 +105,6 @@ public class GobblinServiceConfiguration {
this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
this.isHelixManagerEnabled =
config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
- this.isDagManagerEnabled =
- ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED);
this.isJobStatusMonitorEnabled =
ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
this.isSchedulerEnabled =
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index af0c3461a..e108dec14 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -19,18 +19,6 @@ package org.apache.gobblin.service.modules.core;
import java.util.Objects;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
-import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
-import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
-import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
-import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
-import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
-import
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
-import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,13 +37,18 @@ import com.typesafe.config.Config;
import javax.inject.Singleton;
import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import
org.apache.gobblin.runtime.troubleshooter.InMemoryMultiContextIssueRepository;
import org.apache.gobblin.runtime.troubleshooter.JobIssueEventHandler;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
@@ -75,17 +68,24 @@ import
org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProvider;
import org.apache.gobblin.service.modules.db.ServiceDatabaseProviderImpl;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
+import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
+import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import
org.apache.gobblin.service.modules.troubleshooter.MySqlMultiContextIssueRepository;
import org.apache.gobblin.service.modules.utils.HelixUtils;
-import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
+import
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitorFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
@@ -190,9 +190,7 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(SharedFlowMetricsSingleton.class);
OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
- if (serviceConfig.isTopologyCatalogEnabled()) {
- binder.bind(TopologyCatalog.class);
- }
+ binder.bind(TopologyCatalog.class);
if (serviceConfig.isTopologySpecFactoryEnabled()) {
binder.bind(TopologySpecFactory.class)
@@ -200,10 +198,7 @@ public class GobblinServiceGuiceModule implements Module {
ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY,
ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
}
- OptionalBinder.newOptionalBinder(binder, DagManager.class);
- if (serviceConfig.isDagManagerEnabled()) {
- binder.bind(DagManager.class);
- }
+ binder.bind(DagManager.class);
OptionalBinder.newOptionalBinder(binder, HelixManager.class);
if (serviceConfig.isHelixManagerEnabled()) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 9212eae91..737c13876 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -31,9 +31,6 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ObjectUtils;
-import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
-import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
-import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -97,9 +94,12 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.db.ServiceDatabaseManager;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
+import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
import org.apache.gobblin.service.monitoring.SpecStoreChangeMonitor;
import org.apache.gobblin.util.ConfigUtils;
@@ -188,9 +188,10 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
@Inject(optional = true)
protected GitConfigMonitor gitConfigMonitor;
- @Inject(optional = true)
+ @Inject
@Getter
- protected DagManager dagManager;
+ @VisibleForTesting
+ public DagManager dagManager;
@Inject(optional = true)
protected KafkaJobStatusMonitor jobStatusMonitor;
@@ -317,12 +318,10 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
// TODO: surround by try/catch to disconnect from Helix and fail the
leader transition if DagManager is not
// transitioned properly
- if (configuration.isDagManagerEnabled()) {
- //Activate DagManager only if TopologyCatalog is initialized. If
not; skip activation.
- if (this.topologyCatalog.getInitComplete().getCount() == 0) {
- this.dagManager.setActive(true);
- this.eventBus.register(this.dagManager);
- }
+ //Activate DagManager only if TopologyCatalog is initialized. If not;
skip activation.
+ if (this.topologyCatalog.getInitComplete().getCount() == 0) {
+ this.dagManager.setActive(true);
+ this.eventBus.register(this.dagManager);
}
if (configuration.isOnlyAnnounceLeader()) {
@@ -346,10 +345,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
this.gitConfigMonitor.setActive(false);
}
- if (configuration.isDagManagerEnabled()) {
- this.dagManager.setActive(false);
- this.eventBus.unregister(this.dagManager);
- }
+ this.dagManager.setActive(false);
+ this.eventBus.unregister(this.dagManager);
if (configuration.isOnlyAnnounceLeader()) {
this.d2Announcer.markDownServer();
@@ -359,9 +356,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
}
private void registerServicesInLauncher(){
- if (configuration.isTopologyCatalogEnabled()) {
- this.serviceLauncher.addService(topologyCatalog);
- }
+ this.serviceLauncher.addService(topologyCatalog);
if (configuration.isFlowCatalogEnabled()) {
this.serviceLauncher.addService(flowCatalog);
@@ -371,9 +366,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
}
}
- if (configuration.isDagManagerEnabled()) {
- this.serviceLauncher.addService(dagManager);
- }
+ this.serviceLauncher.addService(dagManager);
this.serviceLauncher.addService(databaseManager);
this.serviceLauncher.addService(issueRepository);
@@ -526,10 +519,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
//Activate the DagManager service, after the topologyCatalog has been
initialized.
if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
- if (configuration.isDagManagerEnabled()) {
- this.dagManager.setActive(true);
- this.eventBus.register(this.dagManager);
- }
+ this.dagManager.setActive(true);
+ this.eventBus.register(this.dagManager);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 3b422c55b..ca38eda0c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -17,20 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -51,9 +37,26 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.eventbus.Subscribe;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
+
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
@@ -91,7 +94,7 @@ import static org.apache.gobblin.service.ExecutionStatus.*;
/**
* This class implements a manager to manage the life cycle of a {@link Dag}.
A {@link Dag} is submitted to the
- * {@link DagManager} by the {@link Orchestrator#orchestrate(Spec)} method. On
receiving a {@link Dag}, the
+ * {@link DagManager} by the {@link Orchestrator#orchestrate} method. On
receiving a {@link Dag}, the
* {@link DagManager} first persists the {@link Dag} to the {@link
DagStateStore}, and then submits it to the specific
* {@link DagManagerThread}'s {@link BlockingQueue} based on the
flowExecutionId of the Flow.
* This guarantees that each {@link Dag} received by the {@link DagManager}
can be recovered in case of a leadership
@@ -194,10 +197,8 @@ public class DagManager extends AbstractIdleService {
DagManagerThread[] dagManagerThreads;
private final ScheduledExecutorService scheduledExecutorPool;
- private final boolean instrumentationEnabled;
private DagStateStore dagStateStore;
private Map<URI, TopologySpec> topologySpecMap;
- private int houseKeepingThreadInitialDelay =
INITIAL_HOUSEKEEPING_THREAD_DELAY;
@Getter
private ScheduledExecutorService houseKeepingThreadPool;
@@ -215,7 +216,7 @@ public class DagManager extends AbstractIdleService {
private final FlowCatalog flowCatalog;
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
private final Config config;
- private final Optional<EventSubmitter> eventSubmitter;
+ private final EventSubmitter eventSubmitter;
private final long failedDagRetentionTime;
private final DagManagerMetrics dagManagerMetrics;
@@ -226,9 +227,10 @@ public class DagManager extends AbstractIdleService {
private volatile boolean isActive = false;
+ @Inject
public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
FlowStatusGenerator flowStatusGenerator,
- FlowCatalog flowCatalog, boolean instrumentationEnabled) {
+ FlowCatalog flowCatalog) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY,
DEFAULT_NUM_THREADS);
this.runQueue = (BlockingQueue<Dag<JobExecutionPlan>>[])
initializeDagQueue(this.numThreads);
@@ -237,14 +239,8 @@ public class DagManager extends AbstractIdleService {
this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
this.pollingInterval = ConfigUtils.getInt(config,
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
this.retentionPollingInterval = ConfigUtils.getInt(config,
FAILED_DAG_POLLING_INTERVAL, DEFAULT_FAILED_DAG_POLLING_INTERVAL);
- this.instrumentationEnabled = instrumentationEnabled;
- MetricContext metricContext = null;
- if (instrumentationEnabled) {
- metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
- this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
- } else {
- this.eventSubmitter = Optional.absent();
- }
+ MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
this.dagManagerMetrics = new DagManagerMetrics();
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config,
JOB_START_SLA_UNITS,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis =
jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME,
ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
@@ -284,13 +280,6 @@ public class DagManager extends AbstractIdleService {
return queue;
}
- @Inject
- public DagManager(Config config, JobStatusRetriever jobStatusRetriever,
- SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
FlowStatusGenerator flowStatusGenerator,
- FlowCatalog flowCatalog) {
- this(config, jobStatusRetriever, sharedFlowMetricsSingleton,
flowStatusGenerator, flowCatalog, true);
- }
-
/** Do Nothing on service startup. Scheduling of {@link DagManagerThread}s
and loading of any {@link Dag}s is done
* during leadership change.
*/
@@ -340,13 +329,11 @@ public class DagManager extends AbstractIdleService {
}
private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
- if (this.eventSubmitter.isPresent()) {
- for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
- JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
- jobExecutionPlan.setExecutionStatus(PENDING);
- }
+ for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+ JobExecutionPlan jobExecutionPlan =
DagManagerUtils.getJobExecutionPlan(dagNode);
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+ new TimingEvent(eventSubmitter,
TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(PENDING);
}
}
@@ -448,7 +435,7 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new
DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
dagActionStore,
- runQueue[i], cancelQueue[i], resumeQueue[i],
instrumentationEnabled, failedDagIds, this.dagManagerMetrics,
+ runQueue[i], cancelQueue[i], resumeQueue[i], failedDagIds,
this.dagManagerMetrics,
this.defaultJobStartSlaTimeMillis, quotaManager, i);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0,
this.pollingInterval, TimeUnit.SECONDS);
@@ -457,7 +444,7 @@ public class DagManager extends AbstractIdleService {
this.scheduledExecutorPool.scheduleAtFixedRate(failedDagRetentionThread, 0,
retentionPollingInterval, TimeUnit.MINUTES);
loadDagFromDagStateStore();
this.houseKeepingThreadPool =
Executors.newSingleThreadScheduledExecutor();
- for (int delay = houseKeepingThreadInitialDelay; delay <
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
+ for (int delay = INITIAL_HOUSEKEEPING_THREAD_DELAY; delay <
MAX_HOUSEKEEPING_THREAD_DELAY; delay *= 2) {
this.houseKeepingThreadPool.schedule(() -> {
try {
loadDagFromDagStateStore();
@@ -510,8 +497,8 @@ public class DagManager extends AbstractIdleService {
final Map<String, Long> dagToSLA = new HashMap<>();
private final MetricContext metricContext;
private final Set<String> dagIdstoClean = new HashSet<>();
- private final Optional<EventSubmitter> eventSubmitter;
- private final Optional<Timer> jobStatusPolledTimer;
+ private final EventSubmitter eventSubmitter;
+ private final Timer jobStatusPolledTimer;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
private final DagManagerMetrics dagManagerMetrics;
private final UserQuotaManager quotaManager;
@@ -523,13 +510,13 @@ public class DagManager extends AbstractIdleService {
private final BlockingQueue<DagId> resumeQueue;
private final Long defaultJobStartSlaTimeMillis;
private final Optional<DagActionStore> dagActionStore;
- private final Optional<Meter> dagManagerThreadHeartbeat;
+ private final Meter dagManagerThreadHeartbeat;
/**
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore
dagStateStore, DagStateStore failedDagStateStore,
Optional<DagActionStore> dagActionStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<DagId> cancelQueue,
- BlockingQueue<DagId> resumeQueue, boolean instrumentationEnabled,
Set<String> failedDagIds, DagManagerMetrics dagManagerMetrics,
+ BlockingQueue<DagId> resumeQueue, Set<String> failedDagIds,
DagManagerMetrics dagManagerMetrics,
Long defaultJobStartSla, UserQuotaManager quotaManager, int
dagMangerThreadId) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
@@ -542,21 +529,13 @@ public class DagManager extends AbstractIdleService {
this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
this.quotaManager = quotaManager;
this.dagActionStore = dagActionStore;
-
- if (instrumentationEnabled) {
- this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
- this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
- this.jobStatusPolledTimer =
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
- ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
- orchestrationDelay::get);
- this.metricContext.register(orchestrationDelayMetric);
- this.dagManagerThreadHeartbeat =
Optional.of(this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT,
dagMangerThreadId)));
- } else {
- this.metricContext = null;
- this.eventSubmitter = Optional.absent();
- this.jobStatusPolledTimer = Optional.absent();
- this.dagManagerThreadHeartbeat = Optional.absent();
- }
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
+ this.eventSubmitter = new EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build();
+ this.jobStatusPolledTimer =
this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER);
+ ContextAwareGauge<Long> orchestrationDelayMetric =
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
+ orchestrationDelay::get);
+ this.metricContext.register(orchestrationDelayMetric);
+ this.dagManagerThreadHeartbeat =
this.metricContext.contextAwareMeter(String.format(DAG_MANAGER_HEARTBEAT,
dagMangerThreadId));
}
/**
@@ -645,7 +624,7 @@ public class DagManager extends AbstractIdleService {
node.getValue().setCurrentAttempts(0);
DagManagerUtils.incrementJobGeneration(node);
Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), node.getValue());
-
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
+
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING_RESUME).stop(jobMetadata);
}
// Set flowStartTime so that flow SLA will be based on current time
instead of original flow
@@ -690,7 +669,7 @@ public class DagManager extends AbstractIdleService {
/**
* Cancels the dag and sends a cancellation tracking event.
- * @param dagToCancel dag node to cancel
+ * @param dagId dag node to cancel
* @throws ExecutionException executionException
* @throws InterruptedException interruptedException
*/
@@ -731,11 +710,9 @@ public class DagManager extends AbstractIdleService {
}
private void sendCancellationEvent(JobExecutionPlan jobExecutionPlan) {
- if (this.eventSubmitter.isPresent()) {
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
-
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
- jobExecutionPlan.setExecutionStatus(CANCELLED);
- }
+ Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CANCEL).stop(jobMetadata);
+ jobExecutionPlan.setExecutionStatus(CANCELLED);
}
/**
@@ -1040,8 +1017,7 @@ public class DagManager extends AbstractIdleService {
quotaManager.checkQuota(Collections.singleton(dagNode));
producer = DagManagerUtils.getSpecProducer(dagNode);
- TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
- getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) :
null;
+ TimingEvent jobOrchestrationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.LauncherTimings.JOB_ORCHESTRATED);
// Increment job count before submitting the job onto the spec
producer, in case that throws an exception.
// By this point the quota is allocated, so it's imperative to
increment as missing would introduce the potential to decrement below zero upon
quota release.
@@ -1065,14 +1041,11 @@ public class DagManager extends AbstractIdleService {
jobMetadata.put(TimingEvent.METADATA_MESSAGE,
producer.getExecutionLink(addSpecFuture, specExecutorUri));
// Add serialized job properties as part of the orchestrated job event
metadata
jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY,
dagNode.getValue().toString());
- if (jobOrchestrationTimer != null) {
- jobOrchestrationTimer.stop(jobMetadata);
- }
+ jobOrchestrationTimer.stop(jobMetadata);
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
this.dagManagerMetrics.incrementJobsSentToExecutor(dagNode);
} catch (Exception e) {
- TimingEvent jobFailedTimer = this.eventSubmitter.isPresent() ?
this.eventSubmitter.get().
- getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED) : null;
+ TimingEvent jobFailedTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " +
DagManagerUtils.getFullyQualifiedJobName(dagNode) + " on executor " +
specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " +
e.getMessage());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 152dca00d..97201a4aa 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -28,7 +28,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
@@ -332,8 +331,8 @@ public class DagManagerUtils {
return dagNode.getValue().getSpecExecutor().getUri().toString();
}
- static void emitFlowEvent(Optional<EventSubmitter> eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
- if (eventSubmitter.isPresent() && !dag.isEmpty()) {
+ static void emitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
+ if (!dag.isEmpty()) {
// Every dag node will contain the same flow metadata
Config config = getDagJobConfig(dag);
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(config);
@@ -345,7 +344,7 @@ public class DagManagerUtils {
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
}
- eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
+ eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ce7931c82..85c8248fb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -17,12 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -31,12 +25,24 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.Setter;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -68,8 +74,6 @@ import
org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -83,20 +87,20 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
protected final Logger _log;
protected final SpecCompiler specCompiler;
- protected final Optional<TopologyCatalog> topologyCatalog;
- protected final Optional<DagManager> dagManager;
+ protected final TopologyCatalog topologyCatalog;
+ protected final DagManager dagManager;
protected final MetricContext metricContext;
- protected final Optional<EventSubmitter> eventSubmitter;
+ protected final EventSubmitter eventSubmitter;
private final boolean isFlowConcurrencyEnabled;
@Getter
- private Optional<Meter> flowOrchestrationSuccessFulMeter;
+ private Meter flowOrchestrationSuccessFulMeter;
@Getter
- private Optional<Meter> flowOrchestrationFailedMeter;
+ private Meter flowOrchestrationFailedMeter;
@Getter
- private Optional<Timer> flowOrchestrationTimer;
- private Optional<Counter> flowFailedForwardToDagManagerCounter;
+ private Timer flowOrchestrationTimer;
+ private Counter flowFailedForwardToDagManagerCounter;
@Setter
private FlowStatusGenerator flowStatusGenerator;
@@ -109,10 +113,10 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private final ClassAliasResolver<SpecCompiler> aliasResolver;
- public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager,
- Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, boolean
instrumentationEnabled,
- Optional<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
- Optional<FlowCatalog> flowCatalog) {
+ @Inject
+ public Orchestrator(Config config, TopologyCatalog topologyCatalog,
DagManager dagManager,
+ Optional<Logger> log, FlowStatusGenerator flowStatusGenerator,
Optional<FlowTriggerHandler> flowTriggerHandler,
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
@@ -135,25 +139,15 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
//At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManager.
- if (this.dagManager.isPresent()) {
-
this.dagManager.get().setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
- }
+ this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
+
+ this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
+ this.flowOrchestrationSuccessFulMeter =
this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER);
+ this.flowOrchestrationFailedMeter =
this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER);
+ this.flowOrchestrationTimer =
this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER);
+ this.flowFailedForwardToDagManagerCounter =
this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT);
+ this.eventSubmitter = new EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build();
- if (instrumentationEnabled) {
- this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.specCompiler.getClass());
- this.flowOrchestrationSuccessFulMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
- this.flowOrchestrationFailedMeter =
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
- this.flowOrchestrationTimer =
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
- this.flowFailedForwardToDagManagerCounter =
Optional.of(this.metricContext.counter(ServiceMetricNames.FLOW_FAILED_FORWARD_TO_DAG_MANAGER_COUNT));
- this.eventSubmitter = Optional.of(new
EventSubmitter.Builder(this.metricContext,
"org.apache.gobblin.service").build());
- } else {
- this.metricContext = null;
- this.flowOrchestrationSuccessFulMeter = Optional.absent();
- this.flowOrchestrationFailedMeter = Optional.absent();
- this.flowOrchestrationTimer = Optional.absent();
- this.flowFailedForwardToDagManagerCounter = Optional.absent();
- this.eventSubmitter = Optional.absent();
- }
this.isFlowConcurrencyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.FLOW_CONCURRENCY_ALLOWED,
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
@@ -163,15 +157,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
quotaManager, eventSubmitter, flowStatusGenerator,
isFlowConcurrencyEnabled);
}
- @Inject
- public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator,
Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> flowTriggerHandler,
- SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog) {
- this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
flowTriggerHandler,
- sharedFlowMetricsSingleton, flowCatalog);
- }
-
-
@VisibleForTesting
public SpecCompiler getSpecCompiler() {
return this.specCompiler;
@@ -199,9 +184,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion,
Properties headers) {
_log.info("Spec deletion detected: " + deletedSpecURI + "/" +
deletedSpecVersion);
- if (topologyCatalog.isPresent()) {
- this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion,
headers);
- }
+ this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion,
headers);
}
/** {@inheritDoc} */
@@ -226,13 +209,12 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
} catch (Exception e) {
_log.error("Failed to update Spec: " + updatedSpec, e);
}
-
}
public void orchestrate(Spec spec, Properties jobProps, long
triggerTimestampMillis, boolean isReminderEvent)
throws Exception {
// Add below waiting because TopologyCatalog and FlowCatalog service can
be launched at the same time
- this.topologyCatalog.get().getInitComplete().await();
+ this.topologyCatalog.getInitComplete().await();
//Wait for the SpecCompiler to become healthy.
this.getSpecCompiler().awaitHealthy();
@@ -261,9 +243,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration
skipped because no trigger timestamp "
+ "associated with flow action.");
- if (this.eventSubmitter.isPresent()) {
- new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
- }
+ new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return;
}
@@ -273,8 +253,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- Optional<TimingEvent> flowCompilationTimer =
- this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+ TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
spec, flowGroup,
flowName);
@@ -284,7 +263,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
return;
}
Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag == null || compiledDag.isEmpty()) {
+ if (compiledDag.isEmpty()) {
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
spec, flowMetadata);
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
@@ -296,47 +275,10 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- if (flowCompilationTimer.isPresent()) {
- flowCompilationTimer.get().stop(flowMetadata);
- }
+ flowCompilationTimer.stop(flowMetadata);
// Depending on if DagManager is present, handle execution
- if (this.dagManager.isPresent()) {
- submitFlowToDagManager(flowSpec, compiledDag);
- } else {
- // Schedule all compiled JobSpecs on their respective Executor
- for (Dag.DagNode<JobExecutionPlan> dagNode : compiledDag.getNodes())
{
- DagManagerUtils.incrementJobAttempt(dagNode);
- JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-
- // Run this spec on selected executor
- SpecProducer producer = null;
- try {
- producer =
jobExecutionPlan.getSpecExecutor().getProducer().get();
- Spec jobSpec = jobExecutionPlan.getJobSpec();
-
- if (!((JobSpec)
jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
- _log.warn("JobSpec does not contain flowExecutionId: {}",
jobSpec);
- }
-
- Map<String, String> jobMetadata =
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
- _log.info(String.format("Going to orchestrate JobSpec: %s on
Executor: %s", jobSpec, producer));
-
- Optional<TimingEvent> jobOrchestrationTimer =
this.eventSubmitter.transform(
- submitter -> new TimingEvent(submitter,
TimingEvent.LauncherTimings.JOB_ORCHESTRATED));
-
- producer.addSpec(jobSpec);
-
- if (jobOrchestrationTimer.isPresent()) {
- jobOrchestrationTimer.get().stop(jobMetadata);
- }
- } catch (Exception e) {
- _log.error("Cannot successfully setup spec: " +
jobExecutionPlan.getJobSpec() + " on executor: " + producer
- + " for flow: " + spec, e);
- }
- }
- deleteSpecFromCatalogIfAdhoc(flowSpec);
- }
+ submitFlowToDagManager(flowSpec, compiledDag);
}
} else {
Instrumented.markMeter(this.flowOrchestrationFailedMeter);
@@ -362,7 +304,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
throws IOException {
try {
// Send the dag to the DagManager
- this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+ this.dagManager.addDag(jobExecutionPlanDag, true, true);
/*
Adhoc flows can be deleted after persisting it in DagManager as the
DagManager's failure recovery method ensures
@@ -373,15 +315,11 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
} catch (Exception ex) {
String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
_log.warn("Orchestrator call - " + failureMessage, ex);
- if (this.flowFailedForwardToDagManagerCounter.isPresent()) {
- this.flowFailedForwardToDagManagerCounter.get().inc();
- }
- if (this.eventSubmitter.isPresent()) {
- // pronounce failed before stack unwinds, to ensure flow not marooned
in `COMPILED` state; (failure likely attributable to DB connection/failover)
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
- new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
- }
+ this.flowFailedForwardToDagManagerCounter.inc();
+ // pronounce failed before stack unwinds, to ensure flow not marooned in
`COMPILED` state; (failure likely attributable to DB connection/failover)
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+ new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
throw ex;
}
}
@@ -393,10 +331,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
if (spec instanceof FlowSpec) {
//Send the dag to the DagManager to stop it.
//Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- if (this.dagManager.isPresent()) {
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.get().stopDag(spec.getUri());
- }
+ _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
+ this.dagManager.stopDag(spec.getUri());
// We need to recompile the flow to find the spec producer,
// If compilation result is different, its remove request can go to some
different spec producer
deleteFromExecutor(spec, headers);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 7ccc094b1..cd1bd421f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,12 +17,6 @@
package org.apache.gobblin.service.modules.scheduler;
-import com.codahale.metrics.MetricFilter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
@@ -38,12 +32,35 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.CronExpression;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricFilter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
@@ -76,19 +93,6 @@ import
org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.CronExpression;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
@@ -168,7 +172,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
@Inject
public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String
serviceName,
Config config,
- Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
+ Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Orchestrator orchestrator, SchedulerService schedulerService,
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
@@ -207,13 +211,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
public GobblinServiceJobScheduler(String serviceName, Config config,
FlowStatusGenerator flowStatusGenerator,
- Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<UserQuotaManager>
quotaManager, SchedulerService schedulerService,
+ Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
TopologyCatalog topologyCatalog,
+ DagManager dagManager, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService,
Optional<Logger> log, boolean isWarmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton)
throws Exception {
- this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log, flowTriggerHandler,
+ this(serviceName, config, helixManager, flowCatalog,
+ new Orchestrator(config, topologyCatalog, dagManager, log,
flowStatusGenerator, flowTriggerHandler,
sharedFlowMetricsSingleton, flowCatalog),
schedulerService, quotaManager, log, isWarmStandbyEnabled,
flowTriggerHandler);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 78b5446bf..674ff0024 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -56,7 +56,7 @@ public final class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
- private final Optional<EventSubmitter> eventSubmitter;
+ private final EventSubmitter eventSubmitter;
private final FlowStatusGenerator flowStatusGenerator;
private final boolean isFlowConcurrencyEnabled;
@@ -66,7 +66,7 @@ public final class FlowCompilationValidationHelper {
* caller.
* @param flowSpec
* @param optionalFlowExecutionId for scheduled (non-ad-hoc) flows, to pass
the ID "laundered" via the DB;
- * see: {@link MysqlMultiActiveLeaseArbiter
javadoc section titled
+ * see: {@link
org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter javadoc section
titled
* `Database event_timestamp laundering`}
* @return jobExecutionPlan dag if one can be constructed for the given
flowSpec
*/
@@ -79,8 +79,7 @@ public final class FlowCompilationValidationHelper {
//Wait for the SpecCompiler to become healthy.
specCompiler.awaitHealthy();
- Optional<TimingEvent> flowCompilationTimer =
- this.eventSubmitter.transform(submitter -> new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILED));
+ TimingEvent flowCompilationTimer = new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILED);
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
validateAndHandleConcurrentExecution(flowConfig, flowSpec, flowGroup,
flowName);
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
@@ -89,15 +88,13 @@ public final class FlowCompilationValidationHelper {
return Optional.absent();
}
- if (jobExecutionPlanDagOptional.get() == null ||
jobExecutionPlanDagOptional.get().isEmpty()) {
+ if (jobExecutionPlanDagOptional.get().isEmpty()) {
populateFlowCompilationFailedEventMessage(eventSubmitter, flowSpec,
flowMetadata);
return Optional.absent();
}
addFlowExecutionIdIfAbsent(flowMetadata, optionalFlowExecutionId,
jobExecutionPlanDagOptional.get());
- if (flowCompilationTimer.isPresent()) {
- flowCompilationTimer.get().stop(flowMetadata);
- }
+ flowCompilationTimer.stop(flowMetadata);
return jobExecutionPlanDagOptional;
}
@@ -133,9 +130,7 @@ public final class FlowCompilationValidationHelper {
Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow failed because
another instance is running and concurrent "
+ "executions are disabled. Set flow.allowConcurrentExecution to
true in the flow spec to change this behaviour.");
- if (eventSubmitter.isPresent()) {
- new TimingEvent(eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
- }
+ new TimingEvent(eventSubmitter,
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
return Optional.absent();
}
}
@@ -158,7 +153,7 @@ public final class FlowCompilationValidationHelper {
* @param spec
* @param flowMetadata
*/
- public static void
populateFlowCompilationFailedEventMessage(Optional<EventSubmitter>
eventSubmitter, Spec spec,
+ public static void populateFlowCompilationFailedEventMessage(EventSubmitter
eventSubmitter, Spec spec,
Map<String, String> flowMetadata) {
// For scheduled flows, we do not insert the flowExecutionId into the
FlowSpec. As a result, if the flow
// compilation fails (i.e. we are unable to find a path), the metadata
will not have flowExecutionId.
@@ -172,12 +167,7 @@ public final class FlowCompilationValidationHelper {
}
flowMetadata.put(TimingEvent.METADATA_MESSAGE, message);
- Optional<TimingEvent> flowCompileFailedTimer =
eventSubmitter.transform(submitter ->
- new TimingEvent(submitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED));
-
- if (flowCompileFailedTimer.isPresent()) {
- flowCompileFailedTimer.get().stop(flowMetadata);
- }
+ new TimingEvent(eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILE_FAILED).stop(flowMetadata);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 15fa2eee6..e27ce534e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -43,8 +43,6 @@ import
org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.FsStateStore;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.function.CheckedExceptionFunction;
@@ -63,8 +61,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
@Inject
public FsJobStatusRetriever(Config config, MultiContextIssueRepository
issueRepository) {
- super(ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
- ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED),
issueRepository);
+ super(issueRepository);
this.stateStore = (FileContextBasedFsStateStore<State>) new
FileContextBasedFsStateStoreFactory().
createStateStore(config.getConfig(CONF_PREFIX), State.class);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
index 84dbb60ce..887ee5827 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/LocalFsJobStatusRetriever.java
@@ -37,7 +37,6 @@ import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.spec_executorInstance.LocalFsSpecProducer;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
/**
@@ -54,7 +53,7 @@ public class LocalFsJobStatusRetriever extends
JobStatusRetriever {
// Do not use a state store for this implementation, just look at the job
folder that @LocalFsSpecProducer writes to
@Inject
public LocalFsJobStatusRetriever(Config config, MultiContextIssueRepository
issueRepository) {
- super(ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED,
issueRepository);
+ super(issueRepository);
this.specProducerPath = config.getString(CONF_PREFIX +
LocalFsSpecProducer.LOCAL_FS_PRODUCER_PATH_KEY);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
index 0b593165d..6b8d1e251 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetriever.java
@@ -35,8 +35,6 @@ import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.util.ConfigUtils;
/**
@@ -65,8 +63,7 @@ public class MysqlJobStatusRetriever extends
JobStatusRetriever {
@Inject
public MysqlJobStatusRetriever(Config config, MultiContextIssueRepository
issueRepository) throws ReflectiveOperationException {
- super(ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
- ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED),
issueRepository);
+ super(issueRepository);
config =
config.getConfig(MYSQL_JOB_STATUS_RETRIEVER_PREFIX).withFallback(config);
this.stateStore =
(MysqlJobStatusStateStoreFactory.class.newInstance()).createStateStore(config,
State.class);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 08e89ef0b..38bfc5bee 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -28,9 +28,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
-
-import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
-import
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jgit.api.Git;
@@ -61,13 +58,20 @@ import
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
+import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
public class GobblinServiceManagerTest {
@@ -193,14 +197,29 @@ public class GobblinServiceManagerTest {
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(new
RefSpec("master")).call();
- this.gobblinServiceManager = GobblinServiceManager.create("CoreService",
"1",
- ConfigUtils.propertiesToConfig(serviceCoreProperties), new
Path(SERVICE_WORK_DIR));
+ this.gobblinServiceManager =
createTestGobblinServiceManager(serviceCoreProperties);
+
this.gobblinServiceManager.start();
this.flowConfigClient = new
FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
this.gobblinServiceManager.getRestLiServerListeningURI().getPort()),
transportClientProperties);
}
+ public static GobblinServiceManager
createTestGobblinServiceManager(Properties serviceCoreProperties) {
+ return createTestGobblinServiceManager(serviceCoreProperties,
"CoreService", "1", SERVICE_WORK_DIR);
+ }
+
+ public static GobblinServiceManager
createTestGobblinServiceManager(Properties serviceCoreProperties,
+ String serviceName, String serviceId, String serviceWorkDir) {
+ GobblinServiceManager gobblinServiceManager =
GobblinServiceManager.create(serviceName, serviceId,
+ ConfigUtils.propertiesToConfig(serviceCoreProperties), new
Path(serviceWorkDir));
+
+ DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
+ doNothing().when(spiedDagManager).setActive(anyBoolean());
+ gobblinServiceManager.dagManager = spiedDagManager;
+ return gobblinServiceManager;
+ }
+
private void cleanUpDir(String dir) throws Exception {
File specStoreDir = new File(dir);
if (specStoreDir.exists()) {
@@ -555,8 +574,7 @@ null, null, null, null);
private void serviceReboot() throws Exception {
this.gobblinServiceManager.stop();
- this.gobblinServiceManager = GobblinServiceManager.create("CoreService",
"1",
- ConfigUtils.propertiesToConfig(serviceCoreProperties), new
Path(SERVICE_WORK_DIR));
+ this.gobblinServiceManager =
createTestGobblinServiceManager(serviceCoreProperties);
this.gobblinServiceManager.start();
this.flowConfigClient = new
FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
this.gobblinServiceManager.getRestLiServerListeningURI().getPort()),
transportClientProperties);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 29782af11..3a61a7301 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -23,7 +23,6 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,12 +45,12 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.TestServiceDatabaseConfig;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.util.ConfigUtils;
@Test
@@ -173,13 +172,13 @@ public class GobblinServiceHATest {
node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
// Start Node 1
- this.node1GobblinServiceManager =
GobblinServiceManager.create("CoreService1", "1",
- ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new
Path(NODE_1_SERVICE_WORK_DIR));
+ this.node1GobblinServiceManager =
GobblinServiceManagerTest.createTestGobblinServiceManager(
+ node1ServiceCoreProperties, "CoreService1", "1",
NODE_1_SERVICE_WORK_DIR);
this.node1GobblinServiceManager.start();
// Start Node 2
- this.node2GobblinServiceManager =
GobblinServiceManager.create("CoreService2", "2",
- ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new
Path(NODE_2_SERVICE_WORK_DIR));
+ this.node2GobblinServiceManager =
GobblinServiceManagerTest.createTestGobblinServiceManager(
+ node2ServiceCoreProperties, "CoreService2", "2",
NODE_2_SERVICE_WORK_DIR);
this.node2GobblinServiceManager.start();
// Initialize Node 1 Client
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index 4f6fbded9..7d2648a28 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -46,12 +46,12 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.GobblinServiceManagerTest;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.TestServiceDatabaseConfig;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PortUtils;
@@ -177,13 +177,13 @@ public class GobblinServiceRedirectTest {
node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2);
// Start Node 1
- this.node1GobblinServiceManager =
GobblinServiceManager.create("RedirectCoreService1", "1",
- ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new
Path(NODE_1_SERVICE_WORK_DIR));
+ this.node1GobblinServiceManager =
GobblinServiceManagerTest.createTestGobblinServiceManager(
+ node1ServiceCoreProperties, "RedirectCoreService1", "1",
NODE_1_SERVICE_WORK_DIR);
this.node1GobblinServiceManager.start();
// Start Node 2
- this.node2GobblinServiceManager =
GobblinServiceManager.create("RedirectCoreService2", "2",
- ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new
Path(NODE_2_SERVICE_WORK_DIR));
+ this.node2GobblinServiceManager =
GobblinServiceManagerTest.createTestGobblinServiceManager(
+ node2ServiceCoreProperties, "RedirectCoreService2", "2",
NODE_2_SERVICE_WORK_DIR);
this.node2GobblinServiceManager.start();
// Initialize Node 1 Client
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index a72f45732..d61d9a6de 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -90,7 +90,7 @@ public class DagManagerFlowTest {
dagActionStore = new MysqlDagActionStore(config);
dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2,
DagActionStore.FlowActionType.RESUME);
- dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props),
false);
+ dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props));
dagManager.dagActionStore = Optional.of(dagActionStore);
dagManager.setActive(true);
this.dagNumThreads = dagManager.getNumThreads();
@@ -391,9 +391,9 @@ class CancelPredicate implements Predicate<Void> {
class MockedDagManager extends DagManager {
- public MockedDagManager(Config config, boolean instrumentationEnabled) {
+ public MockedDagManager(Config config) {
super(config, createJobStatusRetriever(),
Mockito.mock(SharedFlowMetricsSingleton.class),
- Mockito.mock(FlowStatusGenerator.class),
Mockito.mock(FlowCatalog.class), instrumentationEnabled);
+ Mockito.mock(FlowStatusGenerator.class),
Mockito.mock(FlowCatalog.class));
}
private static JobStatusRetriever createJobStatusRetriever() {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index 2babd0683..d5aec8ad4 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -106,7 +106,7 @@ public class DagManagerTest {
this._dagManagerMetrics.activate();
this._dagManagerThread = new
DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore,
_failedDagStateStore,
Optional.absent(), queue, cancelQueue,
- resumeQueue, true, new HashSet<>(), this._dagManagerMetrics,
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
+ resumeQueue, new HashSet<>(), this._dagManagerMetrics,
START_SLA_DEFAULT, _gobblinServiceQuotaManager, 0);
Field jobToDagField =
DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 7e7daef9b..8d64b0978 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,24 +17,33 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.typesafe.config.Config;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
+
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.typesafe.config.Config;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
@@ -46,15 +55,10 @@ import
org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
public class OrchestratorTest {
@@ -73,10 +77,7 @@ public class OrchestratorTest {
private TopologySpec topologySpec;
private FlowCatalog flowCatalog;
- private SpecCatalogListener mockListener;
private FlowSpec flowSpec;
- private FlowStatusGenerator mockStatusGenerator;
- private FlowTriggerHandler _mockFlowTriggerHandler;
private Orchestrator orchestrator;
@BeforeClass
@@ -103,12 +104,14 @@ public class OrchestratorTest {
Optional.of(logger), Optional.<MetricContext>absent(), true, true);
this.serviceLauncher.addService(flowCatalog);
- this.mockStatusGenerator = mock(FlowStatusGenerator.class);
+ FlowStatusGenerator mockStatusGenerator = mock(FlowStatusGenerator.class);
+ FlowTriggerHandler mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
+ DagManager mockDagManager = mock(DagManager.class);
+ doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
- this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.mockStatusGenerator, Optional.of(this.topologyCatalog),
Optional.<DagManager>absent(), Optional.of(logger),
- Optional.of(this._mockFlowTriggerHandler), new
SharedFlowMetricsSingleton(
+ this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
+ Optional.of(mockFlowTriggerHandler), new SharedFlowMetricsSingleton(
ConfigUtils.propertiesToConfig(orchestratorProperties)),
Optional.of(mock(FlowCatalog.class)));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 0fdfc894b..45f8323a3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
-import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -58,9 +57,9 @@ import
org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import
org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
-import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -135,7 +134,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(quotaManager), null, false);
+ ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator,
Optional.of(quotaManager), null, false);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -221,7 +220,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null,
false);
+ ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), null, false);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -284,7 +283,7 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new
TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null,
mockOrchestrator, Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
schedulerService, false);
+ ConfigFactory.empty(), Optional.of(flowCatalog), mockOrchestrator,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)), schedulerService,
false);
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -355,7 +354,7 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService,
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
mockOrchestrator, schedulerService,
Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), false, Optional.of(Mockito.mock(
FlowTriggerHandler.class)));
@@ -374,9 +373,9 @@ public class GobblinServiceJobSchedulerTest {
//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService,
- Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), true, Optional.of(Mockito.mock(
- FlowTriggerHandler.class)));
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
mockOrchestrator, schedulerService,
+ Optional.of(new InMemoryUserQuotaManager(quotaConfig)),
Optional.absent(), true,
+ Optional.of(Mockito.mock(FlowTriggerHandler.class)));
schedulerWithWarmStandbyEnabled.startUp();
schedulerWithWarmStandbyEnabled.setActive(true);
@@ -396,9 +395,9 @@ public class GobblinServiceJobSchedulerTest {
private boolean hasScheduler = false;
public TestGobblinServiceJobScheduler(String serviceName, Config config,
- Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager>
quotaManager,
+ Optional<FlowCatalog> flowCatalog, Orchestrator orchestrator,
Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled)
throws Exception {
- super(serviceName, config, Optional.absent(), flowCatalog,
topologyCatalog, orchestrator, schedulerService,
+ super(serviceName, config, Optional.absent(), flowCatalog, orchestrator,
schedulerService,
quotaManager, Optional.absent(), isWarmStandbyEnabled,
Optional.of(Mockito.mock(FlowTriggerHandler.class)));
if (schedulerService != null) {
hasScheduler = true;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
index 257726e1d..5af639cb5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java
@@ -32,7 +32,6 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
import static org.mockito.Mockito.mock;
@@ -46,9 +45,7 @@ public class FsJobStatusRetrieverTest extends
JobStatusRetrieverTest {
cleanUpDir();
Config config = ConfigFactory.empty()
.withValue(FsJobStatusRetriever.CONF_PREFIX + "." +
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
- ConfigValueFactory.fromAnyRef(stateStoreDir))
- .withValue(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
- ConfigValueFactory.fromAnyRef("true"));
+ ConfigValueFactory.fromAnyRef(stateStoreDir));
this.jobStatusRetriever = new FsJobStatusRetriever(config,
mock(MultiContextIssueRepository.class));
}
@@ -83,31 +80,31 @@ public class FsJobStatusRetrieverTest extends
JobStatusRetrieverTest {
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPILED.name());
Assert.assertEquals(ExecutionStatus.COMPILED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME);
Assert.assertEquals(ExecutionStatus.COMPILED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.ORCHESTRATED.name());
Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.RUNNING.name());
Assert.assertEquals(ExecutionStatus.RUNNING,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
Assert.assertEquals(ExecutionStatus.RUNNING,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPLETE.name());
Assert.assertEquals(ExecutionStatus.COMPLETE,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
deleted file mode 100644
index c91fc5be3..000000000
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTestWithoutDagManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.monitoring;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ExecutionStatus;
-
-import static org.mockito.Mockito.mock;
-
-
-public class FsJobStatusRetrieverTestWithoutDagManager extends
JobStatusRetrieverTest {
-
- private String stateStoreDir = "/tmp/jobStatusRetrieverTest/statestore";
-
- @BeforeClass
- public void setUp() throws Exception {
- cleanUpDir();
- Config config =
ConfigFactory.empty().withValue(FsJobStatusRetriever.CONF_PREFIX + "." +
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
- ConfigValueFactory.fromAnyRef(stateStoreDir));
- this.jobStatusRetriever = new FsJobStatusRetriever(config,
mock(MultiContextIssueRepository.class));
- }
-
- @Test
- public void testGetJobStatusesForFlowExecution() throws IOException {
- super.testGetJobStatusesForFlowExecution();
- }
-
- @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
- public void testJobTiming() throws Exception {
- super.testJobTiming();
- }
-
- @Test (dependsOnMethods = "testJobTiming")
- public void testOutOfOrderJobTimingEvents() throws IOException {
- super.testOutOfOrderJobTimingEvents();
- }
-
- @Test (dependsOnMethods = "testJobTiming")
- public void testGetJobStatusesForFlowExecution1() {
- super.testGetJobStatusesForFlowExecution1();
- }
-
- @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
- public void testGetLatestExecutionIdsForFlow() throws Exception {
- super.testGetLatestExecutionIdsForFlow();
- }
-
- @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
- public void testGetFlowStatusFromJobStatuses() throws Exception {
- long flowExecutionId = 1237L;
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPILED.name());
- Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.ORCHESTRATED.name());
- Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME);
- Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.RUNNING.name());
- Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
- Assert.assertEquals(ExecutionStatus.RUNNING,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
- Assert.assertEquals(ExecutionStatus.RUNNING,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name());
- Assert.assertEquals(ExecutionStatus.COMPLETE,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
- }
-
- @Override
- protected void cleanUpDir() throws Exception {
- File specStoreDir = new File(this.stateStoreDir);
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
- }
- }
-}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
index 00b2598c8..345b5d7cc 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/JobStatusRetrieverTest.java
@@ -175,7 +175,7 @@ public abstract class JobStatusRetrieverTest {
Assert.assertEquals(jobStatus.getJobGroup(), jobGroup);
Assert.assertFalse(jobStatusIterator.hasNext());
Assert.assertEquals(ExecutionStatus.RUNNING,
-
this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.dagManagerEnabled,
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
this.jobStatusRetriever.getFlowStatusFromJobStatuses(this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_2,
ExecutionStatus.RUNNING.name());
jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
index f26ebf59e..66d552e54 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTest.java
@@ -56,7 +56,6 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
-
configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
"true");
this.jobStatusRetriever =
new MysqlJobStatusRetriever(configBuilder.build(),
mock(MultiContextIssueRepository.class));
@@ -95,31 +94,31 @@ public class MysqlJobStatusRetrieverTest extends
JobStatusRetrieverTest {
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPILED.name());
Assert.assertEquals(ExecutionStatus.COMPILED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME);
Assert.assertEquals(ExecutionStatus.COMPILED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.ORCHESTRATED.name());
Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.RUNNING.name());
Assert.assertEquals(ExecutionStatus.RUNNING,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
Assert.assertEquals(ExecutionStatus.RUNNING,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPLETE.name());
Assert.assertEquals(ExecutionStatus.COMPLETE,
-
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
+
jobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME,
FLOW_GROUP, flowExecutionId)));
}
@Test
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
deleted file mode 100644
index 03a621927..000000000
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/MysqlJobStatusRetrieverTestWithoutDagManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.monitoring;
-
-import java.io.IOException;
-
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.metastore.MysqlJobStatusStateStore;
-import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
-import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.troubleshooter.MultiContextIssueRepository;
-import org.apache.gobblin.service.ExecutionStatus;
-import org.apache.gobblin.service.ServiceConfigKeys;
-
-import static org.mockito.Mockito.mock;
-
-
-/**
- * Flow status can be different when DagManager is not being used. So we need
separate unit tests for testing job/flow
- * status when DagManager is disabled.
- */
-public class MysqlJobStatusRetrieverTestWithoutDagManager extends
JobStatusRetrieverTest {
- private MysqlJobStatusStateStore<State> dbJobStateStore;
- private static final String TEST_USER = "testUser";
- private static final String TEST_PASSWORD = "testPassword";
-
- @BeforeClass
- @Override
- public void setUp() throws Exception {
- ITestMetastoreDatabase testMetastoreDatabase =
TestMetastoreDatabaseFactory.get();
- String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
-
- ConfigBuilder configBuilder = ConfigBuilder.create();
-
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
-
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
-
configBuilder.addPrimitive(MysqlJobStatusRetriever.MYSQL_JOB_STATUS_RETRIEVER_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
-
-
configBuilder.addPrimitive(ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY,
"false");
- this.jobStatusRetriever =
- new MysqlJobStatusRetriever(configBuilder.build(),
mock(MultiContextIssueRepository.class));
- this.dbJobStateStore = ((MysqlJobStatusRetriever)
this.jobStatusRetriever).getStateStore();
- cleanUpDir();
- }
-
- @Test
- public void testGetJobStatusesForFlowExecution() throws IOException {
- super.testGetJobStatusesForFlowExecution();
- }
-
- @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution")
- public void testJobTiming() throws Exception {
- super.testJobTiming();
- }
-
- @Test (dependsOnMethods = "testJobTiming")
- public void testOutOfOrderJobTimingEvents() throws IOException {
- super.testOutOfOrderJobTimingEvents();
- }
-
- @Test (dependsOnMethods = "testJobTiming")
- public void testGetJobStatusesForFlowExecution1() {
- super.testGetJobStatusesForFlowExecution1();
- }
-
- @Test (dependsOnMethods = "testGetJobStatusesForFlowExecution1")
- public void testGetLatestExecutionIdsForFlow() throws Exception {
- super.testGetLatestExecutionIdsForFlow();
- }
-
- @Test (dependsOnMethods = "testGetLatestExecutionIdsForFlow")
- public void testGetFlowStatusFromJobStatuses() throws Exception {
- long flowExecutionId = 1237L;
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPILED.name());
- Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.ORCHESTRATED.name());
- Assert.assertEquals(ExecutionStatus.$UNKNOWN,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.ORCHESTRATED.name(), JOB_ORCHESTRATED_TIME,
JOB_ORCHESTRATED_TIME);
- Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.RUNNING.name());
- Assert.assertEquals(ExecutionStatus.ORCHESTRATED,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.RUNNING.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
- Assert.assertEquals(ExecutionStatus.RUNNING,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, JobStatusRetriever.NA_KEY,
ExecutionStatus.COMPLETE.name(), JOB_ORCHESTRATED_TIME, JOB_ORCHESTRATED_TIME);
- Assert.assertEquals(ExecutionStatus.RUNNING,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
-
- addJobStatusToStateStore(flowExecutionId, MY_JOB_NAME_1,
ExecutionStatus.COMPLETE.name());
- Assert.assertEquals(ExecutionStatus.COMPLETE,
-
JobStatusRetriever.getFlowStatusFromJobStatuses(jobStatusRetriever.dagManagerEnabled,
jobStatusRetriever.getJobStatusesForFlowExecution(FLOW_NAME, FLOW_GROUP,
flowExecutionId)));
- }
-
- @Override
- void cleanUpDir() throws Exception {
-
this.dbJobStateStore.delete(KafkaJobStatusMonitor.jobStatusStoreName(FLOW_GROUP,
FLOW_NAME));
- }
-}