phet commented on code in PR #3800:
URL: https://github.com/apache/gobblin/pull/3800#discussion_r1358869580


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -242,7 +242,6 @@ private void runRetentionOnArbitrationTable() {
     ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
     Runnable retentionTask = () -> {
       try {
-        Thread.sleep(10000);

Review Comment:
   not concerned... more wondering... did this become redundant, given the 
scheduled TP executor?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -43,14 +43,17 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.message.processed";
   public static final String 
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
       ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specstoreMonitor.produce.to.consume.delay";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.kills.invoked";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.resumes.invoked";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.flows.launched";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.unexpected.errors";
-  public static final String
-      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.produce.to.consume.delay";
+  public static final String DAG_ACTION_STORE_MONITOR_PREFIX = 
"dagActionStoreMonitor";

Review Comment:
   nit: this is no longer a *prefix*... but why anyway do you prefer to repeat 
so many times `SMNames.GOBBLIN_SERVICE_PREFIX + "."`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -130,30 +132,34 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     String changeIdentifier = tid + key;
     if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
dagActionsSeenCache, operation,
         produceTimestamp.toString())) {
+      this.messageFilteredOutMeter.mark();
       return;
     }
 
+    // Used to easily log information to identify the dag action
+    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
+        dagActionType);
+
     // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
     // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
       if (operation.equals("INSERT")) {
         if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
-          log.info("Received insert dag action and about to send resume flow 
request");
+          log.info("Received insert dag action and about to send resume flow 
request for: {}", dagAction);

Review Comment:
   nit: too conversational.  how about:
   ```
   log.info("DagAction change ({}): {}", operation, dagAction)
   ```
   (i.e. won't the `resume/kill/launch` be logged as `dagAction.dagActionType`)?



##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -43,6 +43,11 @@ public class ServiceMetricNames {
   public static final String FLOW_TRIGGER_HANDLER_JOB_DOES_NOT_EXIST_COUNT = 
GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + 
".jobDoesNotExistInScheduler";
   public static final String FLOW_TRIGGER_HANDLER_FAILED_TO_SET_REMINDER_COUNT 
= GOBBLIN_SERVICE_PREFIX + "." + FLOW_TRIGGER_HANDLER_PREFIX + 
".failedToSetReminderCount";
 
+  // Dag Action Handling Related Metrics
+  public static final String DAG_ACTION_HANDLING_PREFIX = "dagActionHandling";

Review Comment:
   wondering... couldn't this be a `dagManager.` metric?  
`dagManager.failedLaunchEventsOn...`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -510,13 +510,18 @@ public void 
handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
       this.dagActionStore.get().deleteDagAction(launchAction);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (SpecNotFoundException e) {
       log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (IOException e) {
       log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore (check "
           + "stacktrace) due to exception {}", flowId, e.getMessage());
+      this.dagManagerMetrics.incrementFailedLaunchCount();
     } catch (InterruptedException e) {
-      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
+      log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {} due to exception {}", flowId,
+          e);

Review Comment:
   I guess there's the possibility I may be misremembering... and pursuing this 
needlessly... but my expectation is that a stacktrace would only be written 
when calling this form:
   ```
   Logger::warn(String, Throwable)
   ```
   if you call the form:
   ```
   Logger::warn(String, Object, Object) // aka. Logger::warn(String, Object...)
   ```
   are you certain it will print the ST, when the last arg is `Throwable` and 
there's no corresponding `{}` remaining for it in the initial `String` arg?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -308,7 +307,7 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
         if (eventTimeMillis == dbEventTimestamp.getTime()) {
           // TODO: change this to a debug after fixing issue
           log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
dbEventTimeMillis: {} - Reminder event time"
-                  + "is the same as db event.", flowAction, isReminderEvent ? 
"reminder" : "original",
+                  + " is the same as db event.", flowAction, isReminderEvent ? 
"reminder" : "original",

Review Comment:
   nit: I prefer the space at the end of the string prior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to