abhishekmjain commented on code in PR #4136: URL: https://github.com/apache/gobblin/pull/4136#discussion_r2300178657
########## gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/ServiceMetricNames.java: ########## @@ -96,6 +97,8 @@ public class ServiceMetricNames { public static final String DAG_ACTIONS_ACT_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActFailed."; public static final String DAG_ACTIONS_ACT_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsActSucceeded."; public static final String DAG_ACTIONS_CONCLUDE_FAILED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFailed."; + public static final String DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED = DAG_PROCESSING_ENGINE_PREFIX + "dagActionsConcludeFlowSpecRemovalSucceeded"; Review Comment: I see a `.` at the end of each metric name, is it not needed for the new ones we add? ########## gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java: ########## @@ -248,8 +251,22 @@ public CreateKVResponse<ComplexResourceKey<FlowId, FlowStatusId>, FlowConfig> cr // Return conflict and take no action if flowSpec has already been created if (this.flowCatalog.exists(flowSpec.getUri())) { log.warn("FlowSpec with URI {} already exists, no action will be taken", flowSpec.getUri()); - return new CreateKVResponse<>(new RestLiServiceException(HttpStatus.S_409_CONFLICT, - "FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken")); + try { + FlowSpec storedFlowSpec = this.flowCatalog.getSpecs(flowSpec.getUri()); + if (!storedFlowSpec.isScheduled()) { + log.error("FlowSpec Already Exists As Adhoc Flow with URI: " + flowSpec.getUri()); Review Comment: This is not an error scenario for service, there could be genuine multiple attempts. WARN should be fine IMO. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java: ########## @@ -163,6 +167,14 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b } } + public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { + if (succeeded) { + updateMetricForDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, dagActionType); + } else { Review Comment: is succeeded scenario going to be called? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java: ########## @@ -163,6 +167,14 @@ public void markDagActionsConclude(DagActionStore.DagActionType dagActionType, b } } + public void markDagActionsConflowFlowSpecRemoval(DagActionStore.DagActionType dagActionType, boolean succeeded) { Review Comment: `markDagActionsConflowFlowSpecRemoval` - is conflow a typo? ########## gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java: ########## @@ -100,6 +101,8 @@ public FlowConfigsV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String s MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.DELETE_FLOW_METER)); this.runImmediatelyFlow = metricContext.contextAwareMeter( MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.RUN_IMMEDIATELY_FLOW_METER)); + this.flowSpecExistsForAdhocFlow = metricContext.contextAwareMeter( + MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames. RUN_IMMEDIATELY_FLOW_METER)); Review Comment: why are we reusing `RUN_IMMEDIATELY_FLOW_METER` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagProcessingEngineMetrics.java: ########## @@ -89,6 +91,8 @@ public void registerAllMetrics() { registerMetricForEachDagActionType(this.dagActionsActSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_ACT_SUCCEEDED); registerMetricForEachDagActionType(this.dagActionsConcludeFailedMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FAILED); registerMetricForEachDagActionType(this.dagActionsConcludeSucceededMeterByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_SUCCEEDED); + registerMetricForEachDagActionType(this.dagActionsConcludeFlowSpecRemovalSucceededMetreByDagActionType, ServiceMetricNames.DAG_ACTIONS_CONCLUDE_FLOW_SPEC_REMOVAL_SUCCEEDED); Review Comment: meter spelling is incorrect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org