This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 98adee9  [GOBBLIN-840] avoid creating a flow execution id by both 
master and slave gaas
98adee9 is described below

commit 98adee9dd5d26dbf520c2c5d2e2cd3574eb134a2
Author: Arjun <[email protected]>
AuthorDate: Thu Aug 1 15:23:18 2019 -0700

    [GOBBLIN-840] avoid creating a flow execution id by both master and slave 
gaas
    
    Closes #2698 from
    arjun4084346/deduplicateFlowExecutionId
---
 .../gobblin/service/FlowConfigResourceLocalHandler.java     | 13 ++++++++++++-
 .../gobblin/service/FlowConfigV2ResourceLocalHandler.java   |  4 +---
 .../restli/GobblinServiceFlowConfigResourceHandler.java     |  7 +++++++
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 19544ad..b20b5be 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -243,7 +243,18 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
       // If it is not a run-once job, we should not add flow execution id here,
       // because execution id is generated for every scheduled execution of 
the flow and cannot be materialized to
       // the flow catalog. In this case, this id is added during flow 
compilation.
-      configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.valueOf(System.currentTimeMillis()));
+      String flowExecutionId;
+      if 
(flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+        flowExecutionId = 
flowConfig.getProperties().get(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+        // FLOW_EXECUTION_ID may already be present in FlowSpec in cases
+        // where the FlowSpec is forwarded by a slave to the master.
+        log.info("Using the existing flowExecutionId {} for {},{}", 
flowExecutionId, flowConfig.getId().getFlowGroup(), 
flowConfig.getId().getFlowName());
+      } else {
+        flowExecutionId = String.valueOf(System.currentTimeMillis());
+        log.info("Created a flowExecutionId {} for {},{}", flowExecutionId, 
flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName());
+      }
+      flowConfig.getProperties().put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId);
+      configBuilder.addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
flowExecutionId);
     }
 
     if (flowConfig.hasExplain()) {
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index effc415..9fcfec1 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -20,13 +20,11 @@ import java.util.Map;
 import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.linkedin.data.template.StringMap;
-import com.linkedin.data.transform.DataProcessingException;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.UpdateResponse;
-import com.linkedin.restli.server.util.PatchApplier;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -50,7 +48,7 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     this.createFlow.mark();
 
     if (flowConfig.hasExplain()) {
-      createLog += " explain " + Boolean.toString(flowConfig.isExplain());
+      createLog += " explain " + flowConfig.isExplain();
     }
     log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index d056fe1..2be2572 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -37,6 +37,7 @@ import com.linkedin.restli.server.util.PatchApplier;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigLoggedException;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -96,6 +97,12 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
     String flowName = flowConfig.getId().getFlowName();
     String flowGroup = flowConfig.getId().getFlowGroup();
 
+    if 
(flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          String.format("%s cannot be set by the user", 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
+          null);
+    }
+
     checkHelixConnection(ServiceConfigKeys.HELIX_FLOWSPEC_ADD, flowName, 
flowGroup);
 
     try {

Reply via email to