Hi willem,if I throw an exception on purpose. Alpha side got a exception like this:
09:02:26.053 [grpc-default-executor-3] ERROR io.grpc.internal.SerializingExecutor - Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@435c0712 io.grpc.StatusRuntimeException: CANCELLED: call already cancelled OmegaSide got a exception like this: 2019-07-25 09:02:26.046 ERROR 31040 --- [ault-executor-2] o.a.s.p.o.c.g.c.ReconnectStreamObserver : Failed to process grpc coordinate command. io.grpc.StatusRuntimeException: CANCELLED: Failed to read message. ... Caused by: java.lang.RuntimeException: eee at org.apache.servicecomb.pack.omega.transaction.CompensationMessageHandler.onReceive(CompensationMessageHandler.java:36) ~[classes/:na] And then it will try to reconnect alpha: 2019-07-25 09:02:26.048 INFO 31040 --- [pool-2-thread-1] .a.s.p.o.c.g.c.PushBackReconnectRunnable : Retry connecting to alpha at localhost:8080 2019-07-25 09:02:26.055 ERROR 31040 --- [pool-2-thread-1] .a.s.p.o.c.g.c.PushBackReconnectRunnable : Failed to reconnect to alpha at localhost:8080 io.grpc.StatusRuntimeException: UNKNOWN ... 2019-07-25 09:02:29.055 INFO 31040 --- [pool-2-thread-1] .a.s.p.o.c.g.c.PushBackReconnectRunnable : Retry connecting to alpha at localhost:8080 2019-07-25 09:02:29.062 INFO 31040 --- [pool-2-thread-1] .a.s.p.o.c.g.c.PushBackReconnectRunnable : Retry connecting to alpha at localhost:8080 is successful Besides txevent table record event like this: booking SagaStartedEvent car TxStartedEvent car TxEndedEvent hotel TxStartedEvent hotel TxAbortedEvent booking TxAbortedEvent command table like this: car PENDING Willem Jiang <[email protected]> 于2019年7月25日周四 上午8:54写道: > You don't need to remove the TxComensatedEvent sending code. > I think you just need to throw a runtime exception in the onReceive > method to see if the Alpha can get the error. > > > Willem Jiang > > Twitter: willemjiang > Weibo: 姜宁willem > > On Wed, Jul 24, 2019 at 10:33 PM Daniel Qian <[email protected]> > wrote: > > > > Hi Willem. I did some experiment on CompensationMessageHandler. > > > > In first case, I send a delayed TxCompensatedEvent to simulate async > > TxCompensated notification: > > > > @Override > > public void onReceive(final String globalTxId, final String localTxId, > > final String parentTxId, final String compensationMethod, > > Object... payloads) { > > context.apply(globalTxId, localTxId, compensationMethod, payloads); > > new Thread(new Runnable() { > > @Override > > public void run() { > > try { > > Thread.sleep(10000L); > > } catch (InterruptedException e) { > > e.printStackTrace(); > > } > > sender.send(new TxCompensatedEvent(globalTxId, localTxId, > > parentTxId, compensationMethod)); > > } > > }).start(); > > } > > > > I watched command table and found that compensation command status > > PENDING -> DONE transition. > > I also watched txevent table found SagaEnded correctly: > > > > In second case, I just remove TxCompensatedEvent sending code: > > > > @Override > > public void onReceive(final String globalTxId, final String localTxId, > > final String parentTxId, final String compensationMethod, > > Object... payloads) { > > context.apply(globalTxId, localTxId, compensationMethod, payloads); > > } > > > > In command table compensation command remains PENDING and in txevent > > table no SagaEndedEvent happens. > > > > Zheng Feng <[email protected]> 于2019年7月24日周三 下午4:07写道: > > > > > > So the grpc could support the async invoking ? > > > > > > Willem Jiang <[email protected]> 于2019年7月24日周三 下午2:31写道: > > > > > > > Hi Daniel, > > > > > > > > If you take a look at the Omega side, the CompensationMessageHandler > > > > is called in onNext() method of GrpcCompensateStreamObserver. > > > > If there is something wrong with CompensationMessageHandler > > > > onReceive() method, the Stream could be broken and Alpha can now > about > > > > it immediately. (You can write a simple test case to verify it). > > > > > > > > Willem Jiang > > > > > > > > Twitter: willemjiang > > > > Weibo: 姜宁willem > > > > > > > > On Wed, Jul 24, 2019 at 12:27 PM Daniel Qian <[email protected]> > > > > wrote: > > > > > > > > > > Thanks for sharing your thoughts, Willem. But I didn't really get > your > > > > > point. > > > > > > > > > > I read the code and found that compensation action works like this: > > > > > > > > > > 1. Alpha GrpcOmegaCallback send GrpcCompensateCommand to Omega > > > > > 2. Omega CompensationMessageHandler handle the command, execute > > > > > compensation logic and send TxCompensatedEvent to Alpha > > > > > > > > > > In this progress, Alpha doesn't wait for compensation success > message > > > > > after sending compensation command, it just returns. So the > progress is > > > > > already async. > > > > > > > > > > If what I said above is correct, please read the following, if not, > > > > please > > > > > correct me. > > > > > > > > > > As to the current implementation, it's possible that compensation > command > > > > > is sent but no compensation success message received. I think what > you > > > > > said is Alpha should have a mechanism to handle compensation > timeout, > > > > > retry or just log it. But I think this an improvement should be > done on > > > > the > > > > > Alpha side. Am I right? > > > > > > > > > > In my opinion, to support async compensation, the only thing need > to do > > > > is > > > > > providing a way to let user code send TxCompensatedEvent. Sure I > don't > > > > mean > > > > > that we should expose the underly communication, > > > > > GrpcSagaClientMessageSender > > > > > for example, to user code. Providing a encapsulated interface > will be > > > > > better, > > > > > just like what I mentioned before. > > > > > > > > > > Zheng Feng <[email protected]> 于2019年7月23日周二 下午2:10写道: > > > > > > > > > > > > OK, I understand and with my prev experiences with the JTA and > XA, it > > > > > looks > > > > > > similar with the XA.recovery() to get the in-doubt transactions. > So the > > > > > > omega side or the application have to keep the records of the > pending > > > > > > transactions, is it right ? > > > > > > > > > > > > Willem Jiang <[email protected]> 于2019年7月22日周一 下午11:51写道: > > > > > > > > > > > > > Yes, We can kick off the recovery process or status verify > process in > > > > > > > Alpha if the distribute transaction is in suspend status, but > in > > > > this > > > > > > > case we need Alpha talk to customer Application to tell if we > need to > > > > > > > do the recovery again or just do some auditing there. It > depends on > > > > > > > the customer's configuration. > > > > > > > > > > > > > > Willem Jiang > > > > > > > > > > > > > > Twitter: willemjiang > > > > > > > Weibo: 姜宁willem > > > > > > > > > > > > > > On Mon, Jul 22, 2019 at 11:38 PM Zheng Feng <[email protected] > > > > > > wrote: > > > > > > > > > > > > > > > > Hi Willem, > > > > > > > > > > > > > > > > In the term of providing some further processing extension, > you > > > > mean > > > > > that > > > > > > > > the customer could help the state machine to recovery from > the > > > > > suspension > > > > > > > > states ? > > > > > > > > > > > > > > > > Willem Jiang <[email protected]> 于2019年7月22日周一 > 下午11:28写道: > > > > > > > > > > > > > > > > > If we provide the async compensation call, we need to do > lots of > > > > > thing > > > > > > > > > on Alpha and Omega side. My suggestion we just use provide > sync > > > > way > > > > > to > > > > > > > > > get the feedback of compensation immediately to keep the > design > > > > > > > > > simpler. > > > > > > > > > > > > > > > > > > As we are reimplementing the Alpha with state machine, > there are > > > > > some > > > > > > > > > suspension state which could be caused by the timeout or > the > > > > missing > > > > > > > > > event message. > > > > > > > > > I had a long talk with ZhangLei, we are agree that we > could leave > > > > > the > > > > > > > > > status check or recovery to the customer by providing some > > > > further > > > > > > > > > processing extension, as there are too many detail things > in > > > > > customer > > > > > > > > > code to think about. > > > > > > > > > > > > > > > > > > Anyway, it's my pleasure to have this kind of discussion > with the > > > > > > > > > team, it's the beauty of Open Source project development, > we are > > > > > > > > > tackling the interesting problem by working together from > > > > different > > > > > > > > > company :) > > > > > > > > > > > > > > > > > > Willem Jiang > > > > > > > > > > > > > > > > > > Twitter: willemjiang > > > > > > > > > Weibo: 姜宁willem > > > > > > > > > > > > > > > > > > On Mon, Jul 22, 2019 at 5:14 PM Zheng Feng < > [email protected]> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > In term of the compensation method, I think I had > discussed > > > > with > > > > > > > Willem > > > > > > > > > > before that it could introduce the @Status annotation > for the > > > > > alpha > > > > > > > > > server > > > > > > > > > > to query the compensation status. When the compensation > method > > > > is > > > > > > > async > > > > > > > > > > which means it can not response immediately, it would > return > > > > the > > > > > > > > > > COMPENSATING result and the alpha server could query the > > > > @Status > > > > > > > method > > > > > > > > > to > > > > > > > > > > check the compensation status, if this method returns > > > > > COMPENSATE_OK, > > > > > > > the > > > > > > > > > > alpha server will mark the local transaction is > compensated > > > > > otherwise > > > > > > > > > will > > > > > > > > > > mark it with compensate_failed. > > > > > > > > > > > > > > > > > > > > Daniel Qian <[email protected]> 于2019年7月21日周日 > 下午8:37写道: > > > > > > > > > > > > > > > > > > > > > I rethink the idea I proposed, yes, provide low level > apis > > > > is a > > > > > bad > > > > > > > > > idea, > > > > > > > > > > > and I also don't suggest that let user code use > > > > > omega-xxx-transport > > > > > > > > > api. > > > > > > > > > > > > > > > > > > > > > > I think the essential issues are three: > > > > > > > > > > > > > > > > > > > > > > 1. How to pass tx context info across threads > > > > > > > > > > > 2. How to asynchronously tell Alpha that Saga is > ended or > > > > > > > aborted, > > > > > > > > > which > > > > > > > > > > > means not triggered on @SagaStart method returns. > > > > > > > > > > > 3. How to asynchronously tell Alpha that LocalTx is > ended > > > > or > > > > > > > > > aborted, > > > > > > > > > > > which means not triggered on @Compensable method > returns. > > > > > > > > > > > > > > > > > > > > > > I think we can keep using @SagaStart @Compensable for > the > > > > > > > > > XXXStartedEvent, > > > > > > > > > > > and provide a helper to manually end/abort > Saga/LocalTx. > > > > Thanks > > > > > > > for PR > > > > > > > > > #506 > > > > > > > > > > > (SCB-1385) we can use TransactionContext to achieve > that. > > > > Below > > > > > is > > > > > > > a > > > > > > > > > code > > > > > > > > > > > sample: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > @SagaStart(async=true) > > > > > > > > > > > public void foo() { > > > > > > > > > > > TransactionContext txContext = > > > > > > > OmegaContext.getTransactionContext(); > > > > > > > > > > > someAsyncCall() > > > > > > > > > > > .onSuccess(Callback() { > > > > > > > > > > > omega.endSaga(txContext); > > > > > > > > > > > }) > > > > > > > > > > > .onException(Callback() { > > > > > > > > > > > omega.abortSaga(txContext); > > > > > > > > > > > }) > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > @Compensable(async=true, > compensationMethod="rollbackBar") > > > > > > > > > > > public void bar() { > > > > > > > > > > > TransactionContext txContext = > > > > > > > OmegaContext.getTransactionContext(); > > > > > > > > > > > someAsyncCall() > > > > > > > > > > > .onSuccess(Callback() { > > > > > > > > > > > omega.endTx(txContext); > > > > > > > > > > > }) > > > > > > > > > > > .onException(Callback() { > > > > > > > > > > > omega.abortTx(txContext); > > > > > > > > > > > }) > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > The async attribute on @SagaStart and @Compensable > prevents > > > > > > > > > Saga/LocalTx > > > > > > > > > > > ended when method returns. > > > > > > > > > > > TransactionContext object can be passed around safely > because > > > > > it's > > > > > > > > > > > immutable. > > > > > > > > > > > What I have not considered clearly is that how to deal > with > > > > > > > > > compensation > > > > > > > > > > > method if it's also async. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Willem Jiang <[email protected]> 于2019年7月20日周六 > > > > 下午10:30写道: > > > > > > > > > > > > > > > > > > > > > > > > Yeah, I agree we need provide some low level API for > the > > > > user > > > > > to > > > > > > > > > pass. > > > > > > > > > > > > In the recent change of SCB-1386, I introduce the > > > > > > > TransactionContext > > > > > > > > > > > > object which holds the reference of GID and LID, we > may add > > > > > some > > > > > > > > > other > > > > > > > > > > > > transaction context information there too. > > > > > > > > > > > > If the sub transaction is happened in other JVM, we > need to > > > > > pass > > > > > > > the > > > > > > > > > > > > TxContext across the JVM with help of > omega-xxx-transport. > > > > > > > > > > > > > > > > > > > > > > > > We already have some internal API to send the > message from > > > > > Omega > > > > > > > to > > > > > > > > > > > > Alpha, I prefer to use annotation instead of expose > low > > > > level > > > > > > > API to > > > > > > > > > > > > the user. > > > > > > > > > > > > > > > > > > > > > > > > Willem Jiang > > > > > > > > > > > > > > > > > > > > > > > > Twitter: willemjiang > > > > > > > > > > > > Weibo: 姜宁willem > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Jul 20, 2019 at 9:50 PM Daniel Qian < > > > > > > > [email protected]> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > After look into SCB-163, SCB-1385 and SCB-1386 I > have > > > > some > > > > > > > > > thoughts on > > > > > > > > > > > Saga > > > > > > > > > > > > > involved in async invocation. > > > > > > > > > > > > > Current implementation is basically based on sync > > > > > invocation, > > > > > > > > > there are > > > > > > > > > > > > > some assumption: > > > > > > > > > > > > > > > > > > > > > > > > > > 1. When @SagaStart method returns, the Saga > finished. > > > > > > > > > > > > > 2. When @Compensable method returns/throws > exception, > > > > the > > > > > > > Local > > > > > > > > > Tx > > > > > > > > > > > > > succeeds/failed. > > > > > > > > > > > > > 3. When compensationMethod returns, the Local > Tx is > > > > > > > compensated. > > > > > > > > > > > > > > > > > > > > > > > > > > Even if considering what SCB-100 provided: > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Add @OmegaContextAware annotation enabling > > > > > > > > > > > > > java.util.concurrent.Executor inject > OmegaConext into > > > > > > > threads it > > > > > > > > > > > > > manages/spawns > > > > > > > > > > > > > 2. Make OmegaContext use InheritableThreadLocal > field > > > > let > > > > > > > child > > > > > > > > > > > thread > > > > > > > > > > > > > inherit parent thread's Local Tx info > > > > > > > > > > > > > > > > > > > > > > > > > > There are still some limitations: > > > > > > > > > > > > > > > > > > > > > > > > > > 1. @OmegaContextAware is only viable if you use > spring > > > > > > > framework > > > > > > > > > > > > > 2. @OmegaContextAware and OmegaContext's > > > > > > > InheritableThreadLocal > > > > > > > > > > > field > > > > > > > > > > > > > assuming that the calling thread or initator > thread > > > > has > > > > > > > Local Tx > > > > > > > > > > > info. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What if user code use producer-consumer pattern in > which > > > > > > > > > > > > > InheritableThreadLocal can't work? > > > > > > > > > > > > > What if user code use a thread scheduling library > which > > > > we > > > > > > > cannot > > > > > > > > > use > > > > > > > > > > > > > @OmegaContextAware,RxJava and Reactor, for example? > > > > > > > > > > > > > I think we could provide some low-level APIs that > user > > > > code > > > > > can > > > > > > > > > manualy > > > > > > > > > > > > > starts/ends Saga and Local Tx, something like > below: > > > > > > > > > > > > > > > > > > > > > > > > > > TxContext context = omega.startSaga(); > > > > > > > > > > > > > TxContext subTxContext = omega.startTx(TxContext > > > > > > > parentTxContext); > > > > > > > > > > > > > omega.endTx(TxContext); > > > > > > > > > > > > > omega.abortTx(TxContext); > > > > > > > > > > > > > omega.abortSaga(TxContext); > > > > > > > > > > > > > omega.endSaga(TxContext); > > > > > > > > > > > > > > > > > > > > > > > > > > TxContext is just a immutable dto like this: > > > > > > > > > > > > > > > > > > > > > > > > > > public class TxContext { > > > > > > > > > > > > > private final String globalTxId; > > > > > > > > > > > > > private final String localTxId; > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > Above is a just a rough idea. So any thoughts? > > > > > > > > > > > > > -- > > > > > > > > > > > > > Daniel Qian > > > > > > > > > > > > > > > > > > > > > > > > > > 博客:https://segmentfault.com/u/chanjarster > > > > > > > > > > > > > github:https://github.com/chanjarster > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > Daniel Qian > > > > > > > > > > > > > > > > > > > > > > 博客:https://segmentfault.com/u/chanjarster > > > > > > > > > > > github:https://github.com/chanjarster > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Daniel Qian > > > > > > > > > > 博客:https://segmentfault.com/u/chanjarster > > > > > github:https://github.com/chanjarster > > > > > > > > > > > > -- > > Daniel Qian > > > > 博客:https://segmentfault.com/u/chanjarster > > github:https://github.com/chanjarster > -- Daniel Qian 博客:https://segmentfault.com/u/chanjarster github:https://github.com/chanjarster
