takumiCX created SCB-1081:
-----------------------------
Summary: CompositeOmegaCallback's compensate(TxEvent event) method
has concurrency issues
Key: SCB-1081
URL: https://issues.apache.org/jira/browse/SCB-1081
Project: Apache ServiceComb
Issue Type: Bug
Components: Saga
Affects Versions: pack-0.3.0
Reporter: takumiCX
CompositeOmegaCallback类的public void compensate(TxEvent event)方法可能会有并发异常
{code:java}
public void compensate(TxEvent event) {
Map<String, OmegaCallback> serviceCallbacks =
callbacks.getOrDefault(event.serviceName(), emptyMap());
if (serviceCallbacks.isEmpty()) {
throw new AlphaException("No such omega callback found for service " +
event.serviceName());
}
OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
if (omegaCallback == null) {
LOG.info("Cannot find the service with the instanceId {}, call the other
instance.", event.instanceId());
omegaCallback = serviceCallbacks.values().iterator().next();
}
try {
omegaCallback.compensate(event);
} catch (Exception e) {
serviceCallbacks.values().remove(omegaCallback);
throw e;
}
}
{code}
线程可能通过了 if (omegaCallback == null) 的判断条件但是在omegaCallback =
serviceCallbacks.values().iterator().next()之前失去了cpu执行权,由于其他线程对serviceCallbacks这个map的操作,目前看alpha端有两种情况:一种是接收到omega端的onDisconnected请求将对应omega端实例从map中移除;一种是执行pendingTask的线程重新进行补偿时失败执行下面这部分代码catch
(Exception e) {
serviceCallbacks.values().remove(omegaCallback);
throw e;
}时也会移除map中对应的omega端实例。
这部分代码由于是并发异常,发生的可能性本来就非常小,所以比较难以发现和复现。我是对源码在特定位置做了些修改然后复现出来的
{code:java}
if (omegaCallback == null) {
LOG.info("Cannot find the service with the instanceId {}, call the other
instance.", event.instanceId());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
omegaCallback = serviceCallbacks.values().iterator().next();
}{code}
{code:java}
try {
throw new RuntimeException();
// omegaCallback.compensate(event);
} catch (Exception e) {{code}
下面是测试代码
{code:java}
@Test
public void compensateWithConcurrency() throws InterruptedException {
ConcurrentHashMap<String, OmegaCallback> serviceCallbacks = new
ConcurrentHashMap();
serviceCallbacks.put(instanceId1One,callback1One);
callbacks.put(serviceName1,serviceCallbacks);
new Thread(new Runnable() {
@Override
public void run() {
compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1Two,TxStartedEvent));
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(new Runnable() {
@Override
public void run() {
compositeOmegaCallback.compensate(eventOf(serviceName1,instanceId1One,TxStartedEvent));
}
}).start();
TimeUnit.SECONDS.sleep(3);
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)