arjun4084346 commented on a change in pull request #2981:
URL: https://github.com/apache/incubator-gobblin/pull/2981#discussion_r421190385



##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
##########
@@ -293,30 +296,27 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
+    FlowSpec flowSpec = (FlowSpec) spec;
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
-    Preconditions.checkNotNull(spec);
+    Preconditions.checkNotNull(flowSpec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", 
flowSpec.getUri(), flowSpec.getConfigAsProperties()));
+    try {
+      long startTime = System.currentTimeMillis();
+      specStore.addSpec(flowSpec);
+      metrics.updatePutSpecTime(startTime);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, 
e);
+    }
 
-    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", 
spec.getUri(),
-        ((FlowSpec) spec).getConfigAsProperties()));
     if (triggerListener) {
-      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
       for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry: response.getValue().getSuccesses().entrySet()) {
         responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
       }
     }
 
-    boolean compileSuccess = isCompileSuccessful(responseMap);
-
-    if (compileSuccess) {
-      long startTime = System.currentTimeMillis();
-      metrics.updatePutSpecTime(startTime);
-      try {
-        if (!((FlowSpec) spec).isExplain()) {
-          specStore.addSpec(spec);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, 
e);
-      }
+    if (isCompileSuccessful(responseMap)) {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
     } else {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));

Review comment:
       Removing of flow spec is moved to GobblinServiceJobScheduler.java, 
because it was anyway doing removal in some cases, see Line 314. [this is our 
option 3 from the doc]
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to