Hi Daniel,

If you take a look at the Omega side,  the CompensationMessageHandler
is called in onNext() method of GrpcCompensateStreamObserver.
If there is something wrong with CompensationMessageHandler
onReceive() method, the Stream could be broken and Alpha can now about
it immediately. (You can write a simple test case to verify it).

Willem Jiang

Twitter: willemjiang
Weibo: 姜宁willem

On Wed, Jul 24, 2019 at 12:27 PM Daniel Qian <chanjars...@gmail.com> wrote:
>
> Thanks for sharing your thoughts, Willem. But I didn't really get your
> point.
>
> I read the code and found that compensation action works like this:
>
> 1. Alpha GrpcOmegaCallback send GrpcCompensateCommand to Omega
> 2. Omega CompensationMessageHandler handle the command, execute
> compensation logic and send TxCompensatedEvent to Alpha
>
> In this progress, Alpha doesn't wait for compensation success message
> after sending compensation command, it just returns. So the progress is
> already async.
>
> If what I said above is correct, please read the following, if not, please
> correct me.
>
> As to the current implementation, it's possible that compensation command
> is sent but no compensation success message received. I think what you
> said is Alpha should have a mechanism to handle compensation timeout,
> retry or just log it. But I think this an improvement should be done on the
> Alpha side. Am I right?
>
> In my opinion, to support async compensation, the only thing need to do is
> providing a way to let user code send TxCompensatedEvent. Sure I don't mean
> that we should expose the underly communication,
> GrpcSagaClientMessageSender
> for example, to user code.  Providing a encapsulated interface will be
> better,
> just like what I mentioned before.
>
> Zheng Feng <zh.f...@gmail.com> 于2019年7月23日周二 下午2:10写道:
> >
> > OK, I understand and with my prev experiences with the JTA and XA, it
> looks
> > similar with the XA.recovery() to get the in-doubt transactions. So the
> > omega side or the application have to keep the records of the pending
> > transactions, is it right ?
> >
> > Willem Jiang <willem.ji...@gmail.com> 于2019年7月22日周一 下午11:51写道:
> >
> > > Yes, We can kick off the recovery process or status verify process in
> > > Alpha if the distribute transaction is in suspend status,  but in this
> > > case we need Alpha talk to customer Application to tell if we need to
> > > do the recovery again or just do some auditing there. It depends on
> > > the customer's configuration.
> > >
> > > Willem Jiang
> > >
> > > Twitter: willemjiang
> > > Weibo: 姜宁willem
> > >
> > > On Mon, Jul 22, 2019 at 11:38 PM Zheng Feng <zh.f...@gmail.com> wrote:
> > > >
> > > > Hi Willem,
> > > >
> > > > In the term of providing some further processing extension, you mean
> that
> > > > the customer could help the state machine to recovery from the
> suspension
> > > > states ?
> > > >
> > > > Willem Jiang <willem.ji...@gmail.com> 于2019年7月22日周一 下午11:28写道:
> > > >
> > > > > If we provide the async compensation call, we need to do lots of
> thing
> > > > > on Alpha and Omega side. My suggestion we just use provide sync way
> to
> > > > > get the feedback of compensation immediately to keep the design
> > > > > simpler.
> > > > >
> > > > > As we are reimplementing the Alpha with state machine, there are
> some
> > > > > suspension state which could be caused by the timeout or the missing
> > > > > event message.
> > > > > I had a long talk with ZhangLei, we are agree that we could leave
> the
> > > > > status check or recovery to the customer by providing some further
> > > > > processing extension, as there are too many detail things in
> customer
> > > > > code to think about.
> > > > >
> > > > > Anyway, it's my pleasure to have this kind of discussion with the
> > > > > team, it's the beauty of Open Source project development, we are
> > > > > tackling the interesting problem by working together from different
> > > > > company :)
> > > > >
> > > > > Willem Jiang
> > > > >
> > > > > Twitter: willemjiang
> > > > > Weibo: 姜宁willem
> > > > >
> > > > > On Mon, Jul 22, 2019 at 5:14 PM Zheng Feng <zh.f...@gmail.com>
> wrote:
> > > > > >
> > > > > > In term of the compensation method, I think I had discussed with
> > > Willem
> > > > > > before that it could introduce the @Status annotation for the
> alpha
> > > > > server
> > > > > > to query the compensation status. When the compensation method is
> > > async
> > > > > > which means it can not response immediately, it would return the
> > > > > > COMPENSATING result and the alpha server could query the @Status
> > > method
> > > > > to
> > > > > > check the compensation status, if this method returns
> COMPENSATE_OK,
> > > the
> > > > > > alpha server will mark the local transaction is compensated
> otherwise
> > > > > will
> > > > > > mark it with compensate_failed.
> > > > > >
> > > > > > Daniel Qian <chanjars...@gmail.com> 于2019年7月21日周日 下午8:37写道:
> > > > > >
> > > > > > > I rethink the idea I proposed, yes, provide low level apis is a
> bad
> > > > > idea,
> > > > > > > and I also don't suggest that let user code use
> omega-xxx-transport
> > > > > api.
> > > > > > >
> > > > > > > I think the essential issues are three:
> > > > > > >
> > > > > > >    1. How to pass tx context info across threads
> > > > > > >    2. How to asynchronously tell Alpha that Saga is ended or
> > > aborted,
> > > > > which
> > > > > > >    means not triggered on @SagaStart method returns.
> > > > > > >    3. How to asynchronously tell Alpha that LocalTx is ended or
> > > > > aborted,
> > > > > > >    which means not triggered on @Compensable method returns.
> > > > > > >
> > > > > > > I think we can keep using @SagaStart @Compensable for the
> > > > > XXXStartedEvent,
> > > > > > > and provide a helper to manually end/abort Saga/LocalTx. Thanks
> > > for PR
> > > > > #506
> > > > > > > (SCB-1385) we can use TransactionContext to achieve that. Below
> is
> > > a
> > > > > code
> > > > > > > sample:
> > > > > > >
> > > > > > >
> > > > > > > @SagaStart(async=true)
> > > > > > > public void foo() {
> > > > > > >    TransactionContext txContext =
> > > OmegaContext.getTransactionContext();
> > > > > > >    someAsyncCall()
> > > > > > >      .onSuccess(Callback() {
> > > > > > >         omega.endSaga(txContext);
> > > > > > >      })
> > > > > > >      .onException(Callback() {
> > > > > > >         omega.abortSaga(txContext);
> > > > > > >      })
> > > > > > > }
> > > > > > >
> > > > > > > @Compensable(async=true, compensationMethod="rollbackBar")
> > > > > > > public void bar() {
> > > > > > >    TransactionContext txContext =
> > > OmegaContext.getTransactionContext();
> > > > > > >    someAsyncCall()
> > > > > > >      .onSuccess(Callback() {
> > > > > > >         omega.endTx(txContext);
> > > > > > >      })
> > > > > > >      .onException(Callback() {
> > > > > > >         omega.abortTx(txContext);
> > > > > > >      })
> > > > > > > }
> > > > > > >
> > > > > > > The async attribute on @SagaStart and @Compensable prevents
> > > > > Saga/LocalTx
> > > > > > > ended when method returns.
> > > > > > > TransactionContext object can be passed around safely because
> it's
> > > > > > > immutable.
> > > > > > > What I have not considered clearly is that how to deal with
> > > > > compensation
> > > > > > > method if it's also async.
> > > > > > >
> > > > > > >
> > > > > > > Willem Jiang <willem.ji...@gmail.com> 于2019年7月20日周六 下午10:30写道:
> > > > > > > >
> > > > > > > > Yeah, I agree we need provide some low level API for the user
> to
> > > > > pass.
> > > > > > > > In the recent change of SCB-1386, I introduce the
> > > TransactionContext
> > > > > > > > object which holds the reference of GID and LID, we may add
> some
> > > > > other
> > > > > > > > transaction context information there too.
> > > > > > > > If the sub transaction is happened in other JVM, we need to
> pass
> > > the
> > > > > > > > TxContext across the JVM with help of omega-xxx-transport.
> > > > > > > >
> > > > > > > > We already have some internal API to send the message from
> Omega
> > > to
> > > > > > > > Alpha, I prefer to use annotation instead of expose low level
> > > API to
> > > > > > > > the user.
> > > > > > > >
> > > > > > > > Willem Jiang
> > > > > > > >
> > > > > > > > Twitter: willemjiang
> > > > > > > > Weibo: 姜宁willem
> > > > > > > >
> > > > > > > > On Sat, Jul 20, 2019 at 9:50 PM Daniel Qian <
> > > chanjars...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > After look into SCB-163, SCB-1385 and SCB-1386 I have some
> > > > > thoughts on
> > > > > > > Saga
> > > > > > > > > involved in async invocation.
> > > > > > > > > Current implementation is basically based on sync
> invocation,
> > > > > there are
> > > > > > > > > some assumption:
> > > > > > > > >
> > > > > > > > >    1. When @SagaStart method returns,  the Saga finished.
> > > > > > > > >    2. When @Compensable method returns/throws exception, the
> > > Local
> > > > > Tx
> > > > > > > > >    succeeds/failed.
> > > > > > > > >    3. When compensationMethod returns, the Local Tx is
> > > compensated.
> > > > > > > > >
> > > > > > > > > Even if considering what SCB-100 provided:
> > > > > > > > >
> > > > > > > > >    1. Add @OmegaContextAware annotation enabling
> > > > > > > > >    java.util.concurrent.Executor inject OmegaConext into
> > > threads it
> > > > > > > > >    manages/spawns
> > > > > > > > >    2. Make OmegaContext use InheritableThreadLocal field let
> > > child
> > > > > > > thread
> > > > > > > > >    inherit parent thread's Local Tx info
> > > > > > > > >
> > > > > > > > > There are still some limitations:
> > > > > > > > >
> > > > > > > > >    1. @OmegaContextAware is only viable if you use spring
> > > framework
> > > > > > > > >    2. @OmegaContextAware and OmegaContext's
> > > InheritableThreadLocal
> > > > > > > field
> > > > > > > > >    assuming that the calling thread or initator thread has
> > > Local Tx
> > > > > > >  info.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > What if user code use producer-consumer pattern in which
> > > > > > > > > InheritableThreadLocal can't work?
> > > > > > > > > What if user code use a thread scheduling library which we
> > > cannot
> > > > > use
> > > > > > > > > @OmegaContextAware,RxJava and Reactor, for example?
> > > > > > > > > I think we could provide some low-level APIs that user code
> can
> > > > > manualy
> > > > > > > > > starts/ends Saga and Local Tx, something like below:
> > > > > > > > >
> > > > > > > > > TxContext context = omega.startSaga();
> > > > > > > > > TxContext subTxContext = omega.startTx(TxContext
> > > parentTxContext);
> > > > > > > > > omega.endTx(TxContext);
> > > > > > > > > omega.abortTx(TxContext);
> > > > > > > > > omega.abortSaga(TxContext);
> > > > > > > > > omega.endSaga(TxContext);
> > > > > > > > >
> > > > > > > > > TxContext is just a immutable dto like this:
> > > > > > > > >
> > > > > > > > > public class TxContext {
> > > > > > > > >   private final String globalTxId;
> > > > > > > > >   private final String localTxId;
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > Above is a just a rough idea. So any thoughts?
> > > > > > > > > --
> > > > > > > > > Daniel Qian
> > > > > > > > >
> > > > > > > > > 博客:https://segmentfault.com/u/chanjarster
> > > > > > > > > github:https://github.com/chanjarster
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Daniel Qian
> > > > > > >
> > > > > > > 博客:https://segmentfault.com/u/chanjarster
> > > > > > > github:https://github.com/chanjarster
> > > > > > >
> > > > >
> > >
>
>
>
> --
> Daniel Qian
>
> 博客:https://segmentfault.com/u/chanjarster
> github:https://github.com/chanjarster

Reply via email to