[ 
https://issues.apache.org/jira/browse/GOBBLIN-1140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17098110#comment-17098110
 ] 

Chen Guo commented on GOBBLIN-1140:
-----------------------------------

The fix should be easy, you just need to flip the order like this

{code:java}
      if (!((FlowSpec) spec).isExplain()) {
          specStore.addSpec(spec);
          metrics.updatePutSpecTime(startTime);
          if (triggerListener) {
            
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
            for (Map.Entry<SpecCatalogListener, 
CallbackResult<AddSpecResponse>> entry: 
response.getValue().getSuccesses().entrySet()) {
              responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
            }
          }
{code}




> Listeners monitoring added FlowSpecs are fired before Specs are successfully 
> added
> ----------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1140
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1140
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Chen Guo
>            Priority: Critical
>
> This bug was introduced in the commit for [GOBBLIN-1082] "compile a flow 
> before storing it in spec catalog" on 3/12/2020.
> Previously, the listeners were triggered after spec has been added to 
> specStore.
> {code:java}
> specStore.addSpec(spec);
>       metrics.updatePutSpecTime(startTime);
>       if (triggerListener) {
>         
> AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
> AddSpecResponse>> response = this.listeners.onAddSpec(spec);
>         for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
> entry: response.getValue().getSuccesses().entrySet()) {
>           responseMap.put(entry.getKey().getName(), 
> entry.getValue().getResult());
>         }
>       }
> {code}
> After the change, the order becomes
> {code:java}
>     if (triggerListener) {
>       
> AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, 
> AddSpecResponse>> response = this.listeners.onAddSpec(spec);
>       for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
> entry: response.getValue().getSuccesses().entrySet()) {
>         responseMap.put(entry.getKey().getName(), 
> entry.getValue().getResult());
>       }
>     }
>     boolean compileSuccess = isCompileSuccessful(responseMap);
>     if (compileSuccess) {
>       long startTime = System.currentTimeMillis();
>       metrics.updatePutSpecTime(startTime);
>       try {
>         specStore.addSpec(spec);
>       } catch (IOException e) {
>         throw new RuntimeException("Cannot add Spec to Spec store: " + spec, 
> e);
>       }
>       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
> AddSpecResponse<>("true"));
>     }
> {code}
> Due to this bug, the flow spec for the on-demand triggering cannot be deleted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to