takumiCX created SCB-1081:
-----------------------------

             Summary: CompositeOmegaCallback's compensate(TxEvent event) method 
has concurrency issues
                 Key: SCB-1081
                 URL: https://issues.apache.org/jira/browse/SCB-1081
             Project: Apache ServiceComb
          Issue Type: Bug
          Components: Saga
    Affects Versions: pack-0.3.0
            Reporter: takumiCX


CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常

 
{code:java}
public void compensate(TxEvent event) {
 Map<String, OmegaCallback> serviceCallbacks = 
callbacks.getOrDefault(event.serviceName(), emptyMap());
if (serviceCallbacks.isEmpty()) {
 throw new AlphaException("No such omega callback found for service " + 
event.serviceName());
 }
OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
 if (omegaCallback == null) {
 LOG.info("Cannot find the service with the instanceId {}, call the other 
instance.", event.instanceId());
 omegaCallback = serviceCallbacks.values().iterator().next();
 }
try {
 omegaCallback.compensate(event);
 } catch (Exception e) {
 serviceCallbacks.values().remove(omegaCallback);
 throw e;
 }
 }
{code}
线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback = 
serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch
 (Exception e) {
 serviceCallbacks.values().remove(omegaCallback);
 throw e;
 }时也会移除map中对应的omega端实例。

 

这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的
{code:java}
if (omegaCallback == null) {
 LOG.info("Cannot find the service with the instanceId {}, call the other 
instance.", event.instanceId());
 try {
 TimeUnit.SECONDS.sleep(2);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }

 omegaCallback = serviceCallbacks.values().iterator().next();

}{code}
 
{code:java}
try {

 throw new RuntimeException();
// omegaCallback.compensate(event);
 } catch (Exception e) {{code}
 

下面是测试代码
{code:java}
@Test
public void compensateWithConcurrency() throws InterruptedException {

 ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new 
ConcurrentHashMap();
 serviceCallbacks.put(instanceId1One,callback1One);
 callbacks.put(serviceName1,serviceCallbacks);
 new Thread(new Runnable() {
 @Override
 public void run() {
 
compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent));
 }
 }).start();


 TimeUnit.SECONDS.sleep(1);

 new Thread(new Runnable() {
 @Override
 public void run() {
 
compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent));
 }
 }).start();


 TimeUnit.SECONDS.sleep(3);

}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to