[ https://issues.apache.org/jira/browse/GOBBLIN-1140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17098110#comment-17098110 ]
Chen Guo commented on GOBBLIN-1140: ----------------------------------- The fix should be easy, you just need to flip the order like this {code:java} if (!((FlowSpec) spec).isExplain()) { specStore.addSpec(spec); metrics.updatePutSpecTime(startTime); if (triggerListener) { AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec); for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) { responseMap.put(entry.getKey().getName(), entry.getValue().getResult()); } } {code} > Listeners monitoring added FlowSpecs are fired before Specs are successfully > added > ---------------------------------------------------------------------------------- > > Key: GOBBLIN-1140 > URL: https://issues.apache.org/jira/browse/GOBBLIN-1140 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Chen Guo > Priority: Critical > > This bug was introduced in the commit for [GOBBLIN-1082] "compile a flow > before storing it in spec catalog" on 3/12/2020. > Previously, the listeners were triggered after spec has been added to > specStore. > {code:java} > specStore.addSpec(spec); > metrics.updatePutSpecTime(startTime); > if (triggerListener) { > > AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, > AddSpecResponse>> response = this.listeners.onAddSpec(spec); > for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> > entry: response.getValue().getSuccesses().entrySet()) { > responseMap.put(entry.getKey().getName(), > entry.getValue().getResult()); > } > } > {code} > After the change, the order becomes > {code:java} > if (triggerListener) { > > AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, > AddSpecResponse>> response = this.listeners.onAddSpec(spec); > for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> > entry: response.getValue().getSuccesses().entrySet()) { > responseMap.put(entry.getKey().getName(), > entry.getValue().getResult()); > } > } > boolean compileSuccess = isCompileSuccessful(responseMap); > if (compileSuccess) { > long startTime = System.currentTimeMillis(); > metrics.updatePutSpecTime(startTime); > try { > specStore.addSpec(spec); > } catch (IOException e) { > throw new RuntimeException("Cannot add Spec to Spec store: " + spec, > e); > } > responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new > AddSpecResponse<>("true")); > } > {code} > Due to this bug, the flow spec for the on-demand triggering cannot be deleted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)