phet commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1591493291
##########
gobblin-restli/server.gradle:
##########
@@ -45,6 +45,7 @@ dependencies {
}
compile externalDependency.gson
+ compile externalDependency.lombok
Review Comment:
unclear how this was missed... but not having meant MANY compilation warnings
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -81,12 +84,15 @@ public class OrchestratorTest {
private FlowCatalog flowCatalog;
private FlowSpec flowSpec;
- private Orchestrator orchestrator;
+
+ private FlowStatusGenerator mockFlowStatusGenerator;
+ private DagManager mockDagManager;
+ private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
- @BeforeClass
+ @BeforeMethod
Review Comment:
unfortunately lots of "testing smells" here.
first off, class-level test init doesn't play well w/ verifying mock
interactions, since we want a fresh count for the exec of each test method.
relatedly it also complicates - if not outright foreclosing on - parallel test
method execution.
secondly, the `@Test(dependsOnMethod = ...)` structuring is a testing
anti-pattern that here obscured that `setup`-style init had been formulated
instead as a test of it's own - `createTopologySpec()`. a major clue of
something wrong is that particular "@Test" method does not even exercise
`Orchestrator`, our class-under-test!
for now, I've fixed this suite to use per-method setup/teardown best
practices, but left it as a TODO to figure out where `createTopologySpec`
actually belongs. `createFlowSpec` and `deleteFlowSpec` also deserve attention
in this same regard.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -223,6 +223,8 @@ public static GobblinServiceManager
createTestGobblinServiceManager(Properties s
DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
doNothing().when(spiedDagManager).setActive(anyBoolean());
+ // WARNING: this `spiedDagManager` WILL NOT BE the one used by the
`Orchestrator`: its DM has apparently already been
+ // provided to the `Orchestrator` ctor, prior to this replacement here of
`GobblinServiceManager.dagManager`
Review Comment:
documenting for posterity, since unfortunately, this was quite troublesome
to debug
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java:
##########
@@ -57,6 +58,8 @@ public static class NoLongerLeasingStatus extends
LeaseAttemptStatus {}
current LeaseObtainedStatus via the completeLease method from a caller
without access to the {@link MultiActiveLeaseArbiter}.
*/
@Data
+ // avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
Review Comment:
another preventable source of compilation warnings
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -229,33 +229,39 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec, flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ try {
+ TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
+ flowName, flowMetadata);
+
+ if (!compiledDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ return;
+ }
+ Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+ if (compiledDag.isEmpty()) {
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
+ flowMetadata);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.FAILED);
+ _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
+ return;
+ }
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
- sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+ SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
+ flowCompilationTimer.stop(flowMetadata);
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
+ // Depending on if DagManager is present, handle execution
+ submitFlowToDagManager(flowSpec, compiledDag);
+ } finally {
+ // remove from the flow catalog, regardless of whether the flow was
successfully validated and permitted to exec (concurrently)
+ this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+ }
Review Comment:
too bad the diff above doesn't clearly indicate it was solely an indentation
change to add the `try ... finally` here. the purpose of which is to ensure
FlowCatalog cleanup, come what may
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
- this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
- new FlowCompilationValidationHelper(config,
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
- this.topologyCatalog.addListener(orchestrator);
- this.flowCatalog.addListener(orchestrator);
+ FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
Review Comment:
renamed to avoid confusion, as I now pass `Optional.absent()` for the
`FlowLaunchHandler`. nothing about the previously-existing tests below
strictly validated that code path, so I made this entire `OrchestratorTest`
class specific to the legacy `DagManager` version.
as for the lack of validation for the other code path, that suggests a
missing test: e.g. that `orchestrate` invokes
`FlowLaunchHandler::handleFlowLaunchTriggerEvent`
cc: @umustafi and @arjun4084346
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]