abhishekmjain commented on code in PR #4136:
URL: https://github.com/apache/gobblin/pull/4136#discussion_r2300178657


##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -96,6 +97,8 @@ public class ServiceMetricNames {
   public static final String DAG_ACTIONS_ACT_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
   public static final String DAG_ACTIONS_ACT_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
   public static final String DAG_ACTIONS_CONCLUDE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
+  public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED 
= DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded";

Review Comment:
   I see a `.` at the end of each metric name, is it not needed for the new 
ones we add?



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -248,8 +251,22 @@ public CreateKVResponse<ComplexResourceKey<FlowId, 
FlowStatusId>, FlowConfig> cr
     // Return conflict and take no action if flowSpec has already been created
     if (this.flowCatalog.exists(flowSpec.getUri())) {
       log.warn("FlowSpec with URI {} already exists, no action will be taken", 
flowSpec.getUri());
-      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_409_CONFLICT,
-          "FlowSpec with URI " + flowSpec.getUri() + " already exists, no 
action will be taken"));
+      try {
+        FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri());
+        if (!storedFlowSpec.isScheduled()) {
+          log.error("FlowSpec Already Exists As Adhoc Flow  with URI: " + 
flowSpec.getUri());

Review Comment:
   This is not an error scenario for service, there could be genuine multiple 
attempts. WARN should be fine IMO.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -163,6 +167,14 @@ public void 
markDagActionsConclude(DagActionStore.DagActionType dagActionType, b
     }
   }
 
+  public void 
markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {
+    if (succeeded) {
+      
updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType,
 dagActionType);
+    } else {

Review Comment:
   is succeeded scenario going to be called?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -163,6 +167,14 @@ public void 
markDagActionsConclude(DagActionStore.DagActionType dagActionType, b
     }
   }
 
+  public void 
markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType 
dagActionType, boolean succeeded) {

Review Comment:
   `markDagActionsConflowFlowSpecRemoval` - is conflow a typo?



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -100,6 +101,8 @@ public 
FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s
         MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames.DELETE_FLOW_METER));
     this.runImmediatelyFlow = metricContext.contextAwareMeter(
         MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER));
+    this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter(
+        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, 
ServiceMetricNames. RUN_IMMEDIATELY_FLOW_METER));

Review Comment:
   why are we reusing `RUN_IMMEDIATELY_FLOW_METER`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java:
##########
@@ -89,6 +91,8 @@ public void registerAllMetrics() {
     
registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED);
     
registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED);
     
registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED);
+    
registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType,
 ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED);

Review Comment:
   meter spelling is incorrect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to