[ https://issues.apache.org/jira/browse/SCB-1081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Willem Jiang resolved SCB-1081. ------------------------------- Resolution: Fixed Merged the patch into master branch. > CompositeOmegaCallback's compensate(TxEvent event) method has concurrency > issues > -------------------------------------------------------------------------------- > > Key: SCB-1081 > URL: https://issues.apache.org/jira/browse/SCB-1081 > Project: Apache ServiceComb > Issue Type: Bug > Components: Saga > Reporter: takumiCX > Assignee: Willem Jiang > Priority: Major > Fix For: pack-0.3.0 > > Time Spent: 10m > Remaining Estimate: 0h > > CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常 > > {code:java} > public void compensate(TxEvent event) { > Map<String, OmegaCallback> serviceCallbacks = > callbacks.getOrDefault(event.serviceName(), emptyMap()); > if (serviceCallbacks.isEmpty()) { > throw new AlphaException("No such omega callback found for service " + > event.serviceName()); > } > OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId()); > if (omegaCallback == null) { > LOG.info("Cannot find the service with the instanceId {}, call the other > instance.", event.instanceId()); > omegaCallback = serviceCallbacks.values().iterator().next(); > } > try { > omegaCallback.compensate(event); > } catch (Exception e) { > serviceCallbacks.values().remove(omegaCallback); > throw e; > } > } > {code} > 线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback = > serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch > (Exception e) { > serviceCallbacks.values().remove(omegaCallback); > throw e; > }时也会移除map中对应的omega端实例。 > > 这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的 > {code:java} > if (omegaCallback == null) { > LOG.info("Cannot find the service with the instanceId {}, call the other > instance.", event.instanceId()); > try { > TimeUnit.SECONDS.sleep(2); > } catch (InterruptedException e) { > e.printStackTrace(); > } > omegaCallback = serviceCallbacks.values().iterator().next(); > }{code} > > {code:java} > try { > throw new RuntimeException(); > // omegaCallback.compensate(event); > } catch (Exception e) {{code} > > 下面是测试代码 > {code:java} > @Test > public void compensateWithConcurrency() throws InterruptedException { > ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new > ConcurrentHashMap(); > serviceCallbacks.put(instanceId1One,callback1One); > callbacks.put(serviceName1,serviceCallbacks); > new Thread(new Runnable() { > @Override > public void run() { > > compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent)); > } > }).start(); > TimeUnit.SECONDS.sleep(1); > new Thread(new Runnable() { > @Override > public void run() { > > compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent)); > } > }).start(); > TimeUnit.SECONDS.sleep(3); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)