phet commented on code in PR #3896:
URL: https://github.com/apache/gobblin/pull/3896#discussion_r1540522168


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -35,38 +36,39 @@
 
 /**
  * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
- * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ * actions based on the type of {@link DagTask}. Submitting events in time is 
important (PR#3641), hence initialize and

Review Comment:
   unless there's javadoc syntax I'm not familiar with, I'd suggest an actual 
link here - `https://github.com/apache/gobblin/pull/3896`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -35,38 +36,39 @@
 
 /**
  * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
- * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ * actions based on the type of {@link DagTask}. Submitting events in time is 
important (PR#3641), hence initialize and
+ * act methods submit events as they happen.
  */
 @Alpha
 @Slf4j
 @RequiredArgsConstructor
-public abstract class DagProc<S, T> {
+public abstract class DagProc<S> {

Review Comment:
   looks reasonable to eliminate `T` param, although would now be customary to 
rename `S` to `T`.  please also describe the meaning of the generic type in the 
javadoc



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsJobStatusRetrieverTest.java:
##########
@@ -51,6 +51,7 @@ public void setUp() throws Exception {
         ConfigValueFactory.fromAnyRef(stateStoreDir));
     this.jobStatusRetriever = new FsJobStatusRetriever(config, 
mock(MultiContextIssueRepository.class));
     this.mysqlDagActionStore = mock(MysqlDagActionStore.class);
+    // job status monitor stores dag action into dag action store, so need to 
mock that behavior to avoid NPE

Review Comment:
   I did initially suggest adding such a comment, but as I just mentioned, my 
opinion has evolved.  the need for mocking here seems a code smell that 
`KJSM::addJobStatusToStateStore` is doing too much and ought to be reworked.  
following that, no mocking and no comment would be required



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java:
##########
@@ -244,11 +251,12 @@ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> 
dagNode) throws IOExce
   }
 
   @Override
-  public Optional<JobStatus> getJobStatus(String flowGroup, String flowName, 
long flowExecutionId, String jobGroup, String jobName) {
-    Iterator<JobStatus> jobStatusIterator =
-        jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, 
flowExecutionId, jobName, jobGroup);
+  public Optional<JobStatus> getJobStatus(DagNodeId dagNodeId) {
+    Iterator<JobStatus> jobStatusIterator = 
this.jobStatusRetriever.getJobStatusesForFlowExecution(dagNodeId.getFlowName(),
+        dagNodeId.getFlowGroup(), dagNodeId.getFlowExecutionId(), 
dagNodeId.getJobName(), dagNodeId.getJobGroup());
 
     if (jobStatusIterator.hasNext()) {
+      // there must exist exactly one job status for a dag node id

Review Comment:
   may be worth adding either an assert or at least a `log.error` if this 
presumption is violated.  what do you think?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -193,7 +194,7 @@ protected void 
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
         org.apache.gobblin.configuration.State jobStatus = 
parseJobStatus(gobblinTrackingEvent);
         if (jobStatus != null) {
           try (Timer.Context context = 
getMetricContext().timer(GET_AND_SET_JOB_STATUS).time()) {
-            addJobStatusToStateStore(jobStatus, this.stateStore, 
this.eventProducer, this.dagActionStore);
+            addJobStatusToStateStore(jobStatus, this.stateStore, 
this.eventProducer, this.dagActionStore, this.dagProcEngineEnabled);

Review Comment:
   rather than pass one more param to this `static` that exists to be 
`@VisibleForTesting`, I suggested to pass two fewer params, 
`this.eventProducer` and `this.dagActionStore`, but to have it return 
`Optional<JobStatus>` only when the job state went to final.  this would leave 
the `static` much more cohesive (and testable!) by solely updating the job 
state store, without leaving side-effects anywhere else



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -35,38 +36,39 @@
 
 /**
  * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
- * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ * actions based on the type of {@link DagTask}. Submitting events in time is 
important (PR#3641), hence initialize and
+ * act methods submit events as they happen.
  */
 @Alpha
 @Slf4j
 @RequiredArgsConstructor
-public abstract class DagProc<S, T> {
+public abstract class DagProc<S> {
   protected final DagTask dagTask;
+  @Getter protected final DagManager.DagId dagId;
+  @Getter protected final DagNodeId dagNodeId;
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
 
-  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
-    S state = initialize(dagManagementStateStore);   // todo - retry
-    T result = act(dagManagementStateStore, state);   // todo - retry
-    commit(dagManagementStateStore, result);   // todo - retry
-    log.info("{} successfully concluded actions for dagId : {}", 
getClass().getSimpleName(), getDagId());
-  }
-
-  protected DagManager.DagId getDagId() {
-    return this.dagTask.getDagId();
+  public DagProc(DagTask dagTask) {
+    this.dagTask = dagTask;
+    this.dagId = this.dagTask.getDagId();
+    this.dagNodeId = this.dagTask.getDagNodeId();
   }
 
-  protected DagNodeId getDagNodeId() {
-    return this.dagTask.getDagNodeId();
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore);   // todo - retry
+    log.info("{} successfully concluded actions for dagId : {}", 
getClass().getSimpleName(), this.dagId);

Review Comment:
   I don't believe we need `commit`.  do you see a purpose for it? 
   
   nit: "...concluded *processing* for dagId..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to