[
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=932527&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-932527
]
ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 29/Aug/24 23:04
Start Date: 29/Aug/24 23:04
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1737127941
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -168,21 +160,6 @@ protected void shutDown() throws Exception {
this.listeners.close();
}
- /***************************************************
- /* Catalog listeners *
- /**************************************************/
-
- protected void notifyAllListeners() {
- try {
- Iterator<URI> uriIterator = getSpecURIs();
- while (uriIterator.hasNext()) {
- this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
- }
- } catch (IOException e) {
- log.error("Cannot retrieve specs from catalog:", e);
- }
- }
-
Review Comment:
wondering on this...
there's still an `addListener` method. is there no any need to
`notifyListeners`?
##########
gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java:
##########
@@ -205,10 +163,10 @@ public class ServiceConfigKeys {
public static final String ISSUE_REPO_CLASS = GOBBLIN_SERVICE_PREFIX +
"issueRepo.class";
public static final String GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX =
ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "dagProcessingEngine.";
- public static final String DAG_PROCESSING_ENGINE_ENABLED =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "enabled";
public static final String NUM_DAG_PROC_THREADS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
-
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
- public static final String GOBBLIN_SERVICE_MULTI_ACTIVE_EXECUTION_ENABLED =
GOBBLIN_SERVICE_PREFIX + "multiActiveExecutionEnabled";
+ public static final String JOB_START_SLA_TIME =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
+ public static final String JOB_START_SLA_UNITS =
GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
+ public static final long DEFAULT_FLOW_SLA_MILLIS =
TimeUnit.HOURS.toMillis(24);
Review Comment:
three thoughts here:
1. previously the property included the `.dagManager` segment:
```
// Default job start SLA time if configured, measured in minutes. Default
is 10 minutes
// todo - rename "sla" -> "deadline", and move them to DagProcUtils
public static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX +
ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
```
should we continue to accept that legacy prop name so existing flow defs
remain b/w compat?
2. as we introduce a new prop name, let's take the opportunity to align
naming and semantics by calling it `start.deadline`, rather than SLA
3. given that `.dagProcessingEngine` is as much impl. detail as
`.dagManager`, it may not belong in the customer-facing name.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -74,7 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
// to these data structures.
@Getter
@Setter
- protected final Map<URI, TopologySpec> topologySpecMap;
+ protected Map<URI, TopologySpec> topologySpecMap;
Review Comment:
I didn't notice why this wouldn't still be `final`. is a derived class
assigning to it?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -55,7 +55,7 @@
@Slf4j
public class DagManagerMetrics {
Review Comment:
rename to `DagMetrics` or leave as-is?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java:
##########
@@ -299,4 +303,43 @@ public int hashCode() {
public String toString() {
return this.getNodes().stream().map(node ->
node.getValue().toString()).collect(Collectors.toList()).toString();
}
+
+ public enum FlowState {
+ FAILED(-1),
+ RUNNING(0),
+ SUCCESSFUL(1);
+
+ public final int value;
+
+ FlowState(int value) {
+ this.value = value;
+ }
+ }
+
+ @Getter
+ @EqualsAndHashCode
+ public static class DagId {
+ String flowGroup;
+ String flowName;
+ long flowExecutionId;
+
+ public DagId(String flowGroup, String flowName, long flowExecutionId) {
+ this.flowGroup = flowGroup;
+ this.flowName = flowName;
+ this.flowExecutionId = flowExecutionId;
+ }
Review Comment:
`@AllArgsCtor`? actually if you can make the three fields `final`, `@Data`
should take care of many of these things for you
(not clear if that's possible, since the fields aren't `private`... even if
not `@Data` and/or `final`, can we make them `private`?)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java:
##########
@@ -37,9 +38,9 @@
*/
@Slf4j
abstract public class AbstractUserQuotaManager implements UserQuotaManager {
- public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX +
"perUserQuota";
- public static final String PER_FLOWGROUP_QUOTA =
DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
- public static final String USER_JOB_QUOTA_KEY =
DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String PER_USER_QUOTA =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "perUserQuota";
+ public static final String PER_FLOWGROUP_QUOTA =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"perFlowGroupQuota";
+ public static final String USER_JOB_QUOTA_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"defaultJobQuota";
Review Comment:
again, should we honor the old prop names for b/w compat?
also, when we devise a new name to deprecate the old one, let's avoid
embedding impl-specific identifiers in what should otherwise be a user-level
config option w/ clear and steady semantics
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -115,6 +109,31 @@ protected void shutDown()
this.scheduledExecutorPool.awaitTermination(TERMINATION_TIMEOUT,
TimeUnit.SECONDS);
}
+ /**
+ * Action to be performed on a {@link Dag}, in case of a job failure.
Currently, we allow 2 modes:
+ * <ul>
+ * <li> FINISH_RUNNING, which allows currently running jobs to finish.</li>
+ * <li> FINISH_ALL_POSSIBLE, which allows every possible job in the Dag to
finish, as long as all the dependencies
+ * of the job are successful.</li>
+ * </ul>
+ */
+ public enum FailureOption {
+ FINISH_RUNNING("FINISH_RUNNING"),
+ CANCEL("CANCEL"),
+ FINISH_ALL_POSSIBLE("FINISH_ALL_POSSIBLE");
+
+ private final String failureOption;
+
+ FailureOption(final String failureOption) {
+ this.failureOption = failureOption;
+ }
+
+ @Override
+ public String toString() {
+ return this.failureOption;
+ }
+ }
Review Comment:
somehow I though that:
```
public enum FailureOption {
FINISH_RUNNING, CANCEL, FINISH_ALL_POSSIBLE;
}
```
that the `.toString()` would automatically print the enum's name, no?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java:
##########
@@ -69,7 +70,7 @@ public class MySqlDagManagementStateStore implements
DagManagementStateStore {
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
public static final String FAILED_DAG_STATESTORE_PREFIX =
"failedDagStateStore";
- public static final String DAG_STATESTORE_CLASS_KEY =
DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ public static final String DAG_STATESTORE_CLASS_KEY =
ServiceConfigKeys.GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX +
"dagStateStoreClass";
Review Comment:
will this render all present configs silently ignored?
(presuming there's a default... if not, I expect it would instead likely
fail fast)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -57,31 +57,25 @@
@Singleton
public class DagProcessingEngine extends AbstractIdleService {
- @Getter private final Optional<DagTaskStream> dagTaskStream;
- @Getter Optional<DagManagementStateStore> dagManagementStateStore;
+ @Getter private final DagTaskStream dagTaskStream;
+ @Getter DagManagementStateStore dagManagementStateStore;
private final Config config;
- private final Optional<DagProcFactory> dagProcFactory;
+ private final DagProcFactory dagProcFactory;
private ScheduledExecutorService scheduledExecutorPool;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
private static final Integer TERMINATION_TIMEOUT = 30;
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS =
"defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartSlaTimeMillis;
+ public static final String DEFAULT_FLOW_FAILURE_OPTION =
DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name();
Review Comment:
we don't need to use `DagProcessingEngine.` qualifier inside that same
class, do we?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -271,8 +267,7 @@ protected static MetricNameRegexFilter
getMetricsFilterForDagManager() {
}
public void cleanup() {
- // Add null check so that unit test will not affect each other when we
de-active non-instrumented DagManager
- if(this.metricContext != null &&
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManager.class.getSimpleName()))
{
+ if (this.metricContext != null &&
this.metricContext.getTagMap().get(GobblinMetricsKeys.CLASS_META).equals(DagManagerMetrics.class.getSimpleName()))
{
// The DMThread's metrics mappings follow the lifecycle of the DMThread
itself and so are lost by DM deactivation-reactivation but the
RootMetricContext is a (persistent) singleton.
// To avoid IllegalArgumentException by the RMC preventing (re-)add of a
metric already known, remove all metrics that a new DMThread thread would
attempt to add (in DagManagerThread::initialize) whenever running
post-re-enablement
RootMetricContext.get().removeMatching(getMetricsFilterForDagManager());
Review Comment:
this code may need reworking (since aspects recorded in the comments seem to
be changing), but at the least, update the remaining comment and also consider
whether to rename `getMetricsFilterForDagManager()`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -27,17 +27,15 @@
/**
- * An interface for storing and retrieving currently running {@link
Dag<JobExecutionPlan>}s. In case of a leadership
- * change in the {@link
org.apache.gobblin.service.modules.core.GobblinServiceManager}, the
corresponding {@link DagManager}
- * loads the running {@link Dag}s from the {@link DagStateStore} to resume
their execution.
+ * An interface for storing and retrieving currently running {@link
Dag<JobExecutionPlan>}s.
*/
@Alpha
Review Comment:
shall we remove?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStore.java:
##########
@@ -84,8 +81,4 @@ default Dag<JobExecutionPlan> getDag(DagManager.DagId dagId)
throws IOException
*/
@Deprecated
Set<String> getDagIds() throws IOException;
-
- default boolean existsDag(DagManager.DagId dagId) throws IOException {
- throw new UnsupportedOperationException("containsDag not implemented");
- }
Review Comment:
interesting to see this go away... is the thinking that one would merely
`getDag` rather than check for existence?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java:
##########
@@ -136,11 +135,6 @@ public void cleanUp(String dagId)
this.totalDagCount.dec();
Review Comment:
is this in-memory metric still valid in a multi-leader ensemble?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -26,9 +26,9 @@
/**
* Manages the statically configured user quotas for the proxy user in
user.to.proxy configuration, the API requester(s)
- * and the flow group.
- * It is used by the {@link DagManager} to ensure that the number of currently
running jobs do not exceed the quota, if
- * the quota is exceeded, the execution will fail without running on the
underlying executor.
+ * and the flow group. It is used by the {@link
org.apache.gobblin.service.modules.orchestration.proc.DagProc} to ensure
Review Comment:
specifically, is it the `LaunchDagProc`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceJobStartDeadlineDagProc.java:
##########
@@ -63,7 +63,7 @@ protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore,
}
Dag.DagNode<JobExecutionPlan> dagNode =
dagNodeToCheckDeadline.getLeft().get();
- long timeOutForJobStart = DagManagerUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
+ long timeOutForJobStart = DagUtils.getJobStartSla(dagNode,
DagProcessingEngine.getDefaultJobStartSlaTimeMillis());
Review Comment:
let's use the class-rename as an opportunity to update the method too -
`getJobStartDeadline`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
Review Comment:
I do believe they should be separate... and since they're such closely named
classes now, consider augmenting the javadoc that these utils contain
functionality for *processing* a DAG, whereas `DagUtils` is for *interrogating*
a DAG. the javadoc for each might reference the other (circularity should be
ok for javadoc)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
/**
- * Helper class with functionality meant to be re-used between the DagManager
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the
LaunchDagProc and Orchestrator when launching
* executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's
responsibility is to
+ * process out dag action requests. However, with launch executions now being
stored in the DagActionStateStore, on
+ * restart, the LaunchDagProc has to perform validations before executing any
launch actions the previous LaunchDagProc
Review Comment:
the LaunchDP was unable to complete? more like, "that are not yet fully
processed".
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/EnforceFlowFinishDeadlineDagProc.java:
##########
@@ -48,8 +48,8 @@ public
EnforceFlowFinishDeadlineDagProc(EnforceFlowFinishDeadlineDagTask enforce
protected void enforceDeadline(DagManagementStateStore
dagManagementStateStore, Dag<JobExecutionPlan> dag,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
Dag.DagNode<JobExecutionPlan> dagNode = dag.getNodes().get(0);
- long flowFinishDeadline = DagManagerUtils.getFlowSLA(dagNode);
- long flowStartTime = DagManagerUtils.getFlowStartTime(dagNode);
+ long flowFinishDeadline = DagUtils.getFlowSLA(dagNode);
Review Comment:
let's use the class-rename as an opportunity to update the method too -
`getFlowFinishDeadline`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -67,64 +61,51 @@
*/
@Slf4j
public class DagActionStoreChangeMonitor extends HighLevelConsumer<String,
DagActionStoreChangeEvent> {
Review Comment:
I realize we no longer have a need for both `DagActionStoreCM` and also
`DagManagementDagActionStoreCM`, but it's much clearer to read AND less error
prone to keep the latter class as-is, rather than porting over what's different
about it here into what had been its base class.
I'm not against eventually consolidating those into something like this, but
doing so in a PR w/ >110 files carries unnecessary risk that we may have
introduced a small error in the porting.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java:
##########
@@ -204,7 +204,9 @@ protected void createMetrics() {
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
this.duplicateMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_DUPLICATE_MESSAGES);
this.heartbeatMessagesMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_HEARTBEAT_MESSAGES);
- this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
- this.getMetricContext().register(this.produceToConsumeDelayMillis);
+ // Reports delay from all partitions in one gauge
+ ContextAwareGauge<Long> produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(
Review Comment:
all the other metrics have an instance member. why use a local for this one?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -50,16 +50,14 @@
/**
- * Helper class with functionality meant to be re-used between the DagManager
and Orchestrator when launching
+ * Helper class with functionality meant to be re-used between the
LaunchDagProc and Orchestrator when launching
* executions of a flow spec. In the common case, the Orchestrator receives a
flow to orchestrate, performs necessary
- * validations, and forwards the execution responsibility to the DagManager.
The DagManager's responsibility is to
- * carry out any flow action requests. However, with launch executions now
being stored in the DagActionStateStore, on
- * restart or leadership change the DagManager has to perform validations
before executing any launch actions the
- * previous leader was unable to complete. Rather than duplicating the code or
introducing a circular dependency between
- * the DagManager and Orchestrator, this class is utilized to store the common
functionality. It is stateful,
+ * validations, and creates DagActions. The DagProcessingEngine's
responsibility is to
+ * process out dag action requests. However, with launch executions now being
stored in the DagActionStateStore, on
Review Comment:
extra "out" in "process out dag"... but this could simply be:
> creates {@link DagAction}s, which the {@link DPE} is responsible for
processing.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java:
##########
@@ -45,8 +48,8 @@ public void setUp() {
// Tests that if exceeding the quota on startup, do not throw an exception
and do not decrement the counter
@Test
public void testExceedsQuotaOnStartup() throws Exception {
- List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user",
ConfigFactory.empty());
- // Ensure that the current attempt is 1, normally done by DagManager
+ List<Dag<JobExecutionPlan>> dags = DagTestUtils.buildDagList(2, "user",
ConfigFactory.empty());
+ // Ensure that the current attempt is 1, normally done by DagProcs
Review Comment:
which one, launch? reeval?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -79,50 +78,47 @@ public class GitConfigMonitorTest {
private final File testFlowFile2 = new File(testGroupDir, TEST_FLOW_FILE2);
private final File testFlowFile3 = new File(testGroupDir, TEST_FLOW_FILE3);
- private RefSpec masterRefSpec = new RefSpec("master");
+ private final RefSpec masterRefSpec = new RefSpec("master");
private FlowCatalog flowCatalog;
- private SpecCatalogListener mockListener;
- private Config config;
private GitConfigMonitor gitConfigMonitor;
@BeforeClass
public void setup() throws Exception {
- cleanUpDir(TEST_DIR);
+ cleanUpDir();
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir,
FS.DETECTED);
- this.remoteRepo = fileKey.open(false);
- this.remoteRepo.create(true);
+ Repository remoteRepo = fileKey.open(false);
+ remoteRepo.create(true);
- this.gitForPush =
Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+ this.gitForPush =
Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
// push an empty commit as a base for detecting changes
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
- this.config = ConfigBuilder.create()
+ Config config = ConfigBuilder.create()
.addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." +
ConfigurationKeys.GIT_MONITOR_REPO_URI,
- this.remoteRepo.getDirectory().getAbsolutePath())
- .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." +
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig")
- .addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR +
"flowCatalog")
- .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
- .build();
+ remoteRepo.getDirectory().getAbsolutePath())
+ .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." +
ConfigurationKeys.GIT_MONITOR_REPO_DIR,
+ TEST_DIR +
"/jobConfig").addPrimitive(FlowCatalog.FLOWSPEC_STORE_DIR_KEY, TEST_DIR +
"flowCatalog")
+ .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL,
5).build();
this.flowCatalog = new FlowCatalog(config);
- this.mockListener = mock(SpecCatalogListener.class);
-
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
- when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_ORCHESTRATOR_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse<>(""));
this.flowCatalog.addListener(mockListener);
this.flowCatalog.startAsync().awaitRunning();
- this.gitConfigMonitor = new GitConfigMonitor(this.config,
this.flowCatalog);
+ this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
this.gitConfigMonitor.setActive(true);
}
- private void cleanUpDir(String dir) {
- File specStoreDir = new File(dir);
+ private void cleanUpDir() {
+ File specStoreDir = new File(GitConfigMonitorTest.TEST_DIR);
Review Comment:
NBD, but I personally favor hard-coding params, rather than dropping them to
hard-code the impl. the former is not only clearer to read, but more
future-proof
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -78,32 +76,19 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
private final Config config;
@Getter private final EventSubmitter eventSubmitter;
protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
- protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
- private final boolean isMultiActiveExecutionEnabled;
+ protected DagActionReminderScheduler dagActionReminderScheduler;
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private final BlockingQueue<DagActionStore.LeaseParams> leaseParamsQueue =
new LinkedBlockingQueue<>();
private final DagManagementStateStore dagManagementStateStore;
private final DagProcessingEngineMetrics dagProcEngineMetrics;
@Inject
- public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore,
- @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
Review Comment:
don't we need this name for init/config-time discernment between two
different versions of the MALA?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagTestUtils.java:
##########
@@ -81,7 +83,7 @@ public static Dag<JobExecutionPlan> buildDag(String id, Long
flowExecutionId) th
addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
flowExecutionId).
addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).build();
if (i > 0) {
- jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job" + (i - 1)));
+ jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES,
ConfigValueFactory.fromAnyRef("job0"));
Review Comment:
semantics have changed here. if we're confident we always want "job0",
should we remove the `if (i > 0)` (or change it to `if (i == 1)`)?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java:
##########
@@ -241,14 +237,9 @@ public void testForcedPushConfig() throws IOException,
GitAPIException, URISynta
Collection<Spec> specs = this.flowCatalog.getSpecs();
- Assert.assertTrue(specs.size() == 2);
+ Assert.assertEquals(specs.size(), 2);
List<Spec> specList = Lists.newArrayList(specs);
- specList.sort(new Comparator<Spec>() {
- @Override
- public int compare(Spec o1, Spec o2) {
- return o1.getUri().compareTo(o2.getUri());
- }
- });
+ specList.sort(Comparator.comparing(Spec::getUri));
Review Comment:
great improvement!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -329,22 +207,15 @@ public void remove(Spec spec, Properties headers) throws
IOException {
if (spec instanceof FlowSpec) {
String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
String flowName = FlowSpec.Utils.getFlowName(uri);
- if (this.flowLaunchHandler.isPresent()) {
- List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
- _log.info("Found {} flows to cancel.", flowExecutionIds.size());
-
- for (long flowExecutionId : flowExecutionIds) {
- DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
- DagActionStore.DagActionType.KILL);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(killDagAction, false,
- System.currentTimeMillis());
- flowLaunchHandler.get().handleFlowKillTriggerEvent(new Properties(),
leaseParams);
- }
- } else {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(uri);
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
Review Comment:
indentation
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagFlowTest.java:
##########
Review Comment:
is this the right class? isn't this more of a "JobSpecTest"?
Issue Time Tracking
-------------------
Worklog Id: (was: 932527)
Remaining Estimate: 0h
Time Spent: 10m
> remove obsolete code related to DagManager
> ------------------------------------------
>
> Key: GOBBLIN-2136
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)