khandelwal-prateek commented on code in PR #4136:
URL: https://github.com/apache/gobblin/pull/4136#discussion_r2304557997


##########
gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java:
##########
@@ -96,6 +97,8 @@ public class ServiceMetricNames {
   public static final String DAG_ACTIONS_ACT_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed.";
   public static final String DAG_ACTIONS_ACT_SUCCEEDED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded.";
   public static final String DAG_ACTIONS_CONCLUDE_FAILED = 
DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed.";
+  public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED 
= DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded.";

Review Comment:
   success is the expected/default case, so the success metric doesn't give us 
any additional signal. The failure metric is actionable, and tracking failures 
should be sufficient here. I would suggest dropping success metric for this one 
as it just adds noise



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -54,7 +54,15 @@ public final boolean conclude() {
         FlowSpec flowSpec =
             
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
         if (!flowSpec.isScheduled()) {
-          dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+          try {
+            //This can throw Runtime, IllegalState and IO Exceptions which are 
not caught here.
+            dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+          } catch (Exception e) {
+            
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(),
 false);
+            log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: 
" + flowSpec.getUri());
+            return false;

Review Comment:
   earlier the RuntimeException was not caught here, so it was getting caught 
in DagProcessingEngine which was marking 
`dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();`,
 now we are gulping the exception so it would not be handled in 
DagProcessingEngine, we should re-throw the exception here



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -248,8 +251,22 @@ public CreateKVResponse<ComplexResourceKey<FlowId, 
FlowStatusId>, FlowConfig> cr
     // Return conflict and take no action if flowSpec has already been created
     if (this.flowCatalog.exists(flowSpec.getUri())) {
       log.warn("FlowSpec with URI {} already exists, no action will be taken", 
flowSpec.getUri());
-      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_409_CONFLICT,
-          "FlowSpec with URI " + flowSpec.getUri() + " already exists, no 
action will be taken"));
+      try {
+        FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri());
+        if (!storedFlowSpec.isScheduled()) {
+          log.warn("FlowSpec Already Exists As Adhoc Flow  with URI: " + 
flowSpec.getUri());
+          if (!flowSpec.isScheduled()) {
+            flowSpecExistsForAdhocFlow.mark();
+          }
+        } else {
+          log.warn("FlowSpec Already Exists As Scheduled Flow with URI: " + 
flowSpec.getUri());

Review Comment:
   please move log from line 253 to here, since we are logging twice now



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -54,7 +54,15 @@ public final boolean conclude() {
         FlowSpec flowSpec =
             
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
         if (!flowSpec.isScheduled()) {
-          dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+          try {
+            //This can throw Runtime, IllegalState and IO Exceptions which are 
not caught here.
+            dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+          } catch (Exception e) {
+            
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(),
 false);
+            log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: 
" + flowSpec.getUri());
+            return false;
+          }
+          
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(),
 true);

Review Comment:
   imo, we don't need to mark for success



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -54,7 +54,15 @@ public final boolean conclude() {
         FlowSpec flowSpec =
             
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
         if (!flowSpec.isScheduled()) {
-          dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+          try {
+            //This can throw Runtime, IllegalState and IO Exceptions which are 
not caught here.
+            dagManagementStateStore.removeFlowSpec(flowSpec.getUri(), new 
Properties(), false);
+          } catch (Exception e) {
+            
super.dagProcEngineMetrics.markDagActionsConcludeFlowSpecRemoval(this.dagAction.getDagActionType(),
 false);
+            log.error("Failed to Remove The FlowSpec For Adhoc Flow with URI: 
" + flowSpec.getUri());

Review Comment:
   Please log the exception as well using `log.error(..., e)`, so that stack 
trace is captured.



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -248,8 +251,22 @@ public CreateKVResponse<ComplexResourceKey<FlowId, 
FlowStatusId>, FlowConfig> cr
     // Return conflict and take no action if flowSpec has already been created
     if (this.flowCatalog.exists(flowSpec.getUri())) {
       log.warn("FlowSpec with URI {} already exists, no action will be taken", 
flowSpec.getUri());
-      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_409_CONFLICT,
-          "FlowSpec with URI " + flowSpec.getUri() + " already exists, no 
action will be taken"));
+      try {
+        FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri());
+        if (!storedFlowSpec.isScheduled()) {
+          log.warn("FlowSpec Already Exists As Adhoc Flow  with URI: " + 
flowSpec.getUri());
+          if (!flowSpec.isScheduled()) {
+            flowSpecExistsForAdhocFlow.mark();
+          }
+        } else {
+          log.warn("FlowSpec Already Exists As Scheduled Flow with URI: " + 
flowSpec.getUri());
+        }
+      } catch (SpecNotFoundException e) {
+        log.error("Error Retrieving FLow For Existing Flow With URI: " + 
flowSpec.getUri());

Review Comment:
   this is not required, since the if block has already checked for existence 
of flowSpec



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to