This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 70e480fe7 Consistly handle Rest.li /flowexecutions KILL and RESUME
actions (#3830)
70e480fe7 is described below
commit 70e480fe74ff37f6d7cdacbde8945095a55169ff
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Nov 20 14:22:30 2023 -0800
Consistly handle Rest.li /flowexecutions KILL and RESUME actions (#3830)
---
...lowExecutionResourceHandlerWithWarmStandby.java | 60 ++++++++--------------
1 file changed, 22 insertions(+), 38 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 0b5d1cdc7..f391e8fe7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -29,9 +29,11 @@ import java.io.IOException;
import java.sql.SQLException;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
+import org.apache.gobblin.service.FlowStatusId;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.helix.HelixManager;
@@ -46,56 +48,38 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
this.dagActionStore = dagActionStore;
}
+ @Override
+ public void resume(ComplexResourceKey<FlowStatusId, EmptyRecord> key) {
+ FlowStatusId id = key.getKey();
+ addDagAction(id.getFlowGroup(), id.getFlowName(), id.getFlowExecutionId(),
DagActionStore.FlowActionType.RESUME);
+ }
@Override
- public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord>
key) {
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- Long flowExecutionId = key.getKey().getFlowExecutionId();
+ public UpdateResponse
delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord>
key) {
+ FlowStatusId id = key.getKey();
+ addDagAction(id.getFlowGroup(), id.getFlowName(), id.getFlowExecutionId(),
DagActionStore.FlowActionType.KILL);
+ return new UpdateResponse(HttpStatus.S_200_OK);
+ }
+
+ /** NOTE: may throw {@link RestLiServiceException}, see:
https://linkedin.github.io/rest.li/user_guide/restli_server#returning-errors */
+ protected void addDagAction(String flowGroup, String flowName, Long
flowExecutionId, DagActionStore.FlowActionType actionType) {
try {
// If an existing resume request is still pending then do not accept
this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
- this.prepareError("There is already a pending RESUME action for this
flow. Please wait to resubmit and wait "
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), actionType)) {
+ this.throwErrorResponse("There is already a pending " + actionType + "
action for this flow. Please wait to resubmit and wait "
+ "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+ this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), actionType);
} catch (IOException | SQLException e) {
log.warn(
- String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
+ String.format("Failed to add %s action for flow %s %s %s to dag
action store due to:", actionType, flowGroup,
flowName, flowExecutionId), e);
- this.prepareError(e.getMessage(),
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
- }
-
- }
-
- private void prepareError(String exceptionMessage, HttpStatus errorType) {
- if (errorType == HttpStatus.S_409_CONFLICT) {
- throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
exceptionMessage);
- } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
- throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST);
+ this.throwErrorResponse(e.getMessage(),
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
- throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
exceptionMessage);
}
- @Override
- public UpdateResponse
delete(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, EmptyRecord>
key) {
-
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- Long flowExecutionId = key.getKey().getFlowExecutionId();
- try {
- // If an existing kill request is still pending then do not accept this
request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL)) {
- this.prepareError("There is already a pending KILL action for this
flow. Please wait to resubmit and wait "
- + "for action to be completed.", HttpStatus.S_400_BAD_REQUEST);
- }
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL);
- return new UpdateResponse(HttpStatus.S_200_OK);
- } catch (IOException | SQLException e) {
- this.prepareError(String.format("Failed to add execution delete action
for flow %s %s %s to dag action store due to", flowGroup,
- flowName, flowExecutionId), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
- return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
- }
+ private void throwErrorResponse(String exceptionMessage, HttpStatus
errorType) {
+ throw StringUtils.isBlank(exceptionMessage) ? new
RestLiServiceException(errorType) : new RestLiServiceException(errorType,
exceptionMessage);
}
}