Hi willem,if I throw an exception on purpose. Alpha side got a exception
like this:

09:02:26.053 [grpc-default-executor-3] ERROR
io.grpc.internal.SerializingExecutor - Exception while executing runnable
io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@435c0712
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled

OmegaSide got a exception like this:

2019-07-25 09:02:26.046 ERROR 31040 --- [ault-executor-2]
o.a.s.p.o.c.g.c.ReconnectStreamObserver  : Failed to process grpc
coordinate command.
io.grpc.StatusRuntimeException: CANCELLED: Failed to read message.
...
Caused by: java.lang.RuntimeException: eee
at
org.apache.servicecomb.pack.omega.transaction.CompensationMessageHandler.onReceive(CompensationMessageHandler.java:36)
~[classes/:na]

And then it will try to reconnect alpha:

2019-07-25 09:02:26.048  INFO 31040 --- [pool-2-thread-1]
.a.s.p.o.c.g.c.PushBackReconnectRunnable : Retry connecting to alpha at
localhost:8080
2019-07-25 09:02:26.055 ERROR 31040 --- [pool-2-thread-1]
.a.s.p.o.c.g.c.PushBackReconnectRunnable : Failed to reconnect to alpha at
localhost:8080
io.grpc.StatusRuntimeException: UNKNOWN
...
2019-07-25 09:02:29.055  INFO 31040 --- [pool-2-thread-1]
.a.s.p.o.c.g.c.PushBackReconnectRunnable : Retry connecting to alpha at
localhost:8080
2019-07-25 09:02:29.062  INFO 31040 --- [pool-2-thread-1]
.a.s.p.o.c.g.c.PushBackReconnectRunnable : Retry connecting to alpha at
localhost:8080 is successful

Besides txevent table record event like this:

booking   SagaStartedEvent
car       TxStartedEvent
car       TxEndedEvent
hotel     TxStartedEvent
hotel     TxAbortedEvent
booking   TxAbortedEvent

command table like this:

car       PENDING


Willem Jiang <[email protected]> 于2019年7月25日周四 上午8:54写道:

> You don't need to remove the TxComensatedEvent sending code.
> I think you just need to throw a runtime exception  in the onReceive
> method to see if the Alpha can get the error.
>
>
> Willem Jiang
>
> Twitter: willemjiang
> Weibo: 姜宁willem
>
> On Wed, Jul 24, 2019 at 10:33 PM Daniel Qian <[email protected]>
> wrote:
> >
> > Hi Willem. I did some experiment on CompensationMessageHandler.
> >
> > In first case, I send a delayed TxCompensatedEvent to simulate async
> > TxCompensated notification:
> >
> > @Override
> > public void onReceive(final String globalTxId, final String localTxId,
> > final String parentTxId, final String compensationMethod,
> >     Object... payloads) {
> >   context.apply(globalTxId, localTxId, compensationMethod, payloads);
> >   new Thread(new Runnable() {
> >     @Override
> >     public void run() {
> >       try {
> >         Thread.sleep(10000L);
> >       } catch (InterruptedException e) {
> >         e.printStackTrace();
> >       }
> >       sender.send(new TxCompensatedEvent(globalTxId, localTxId,
> > parentTxId, compensationMethod));
> >     }
> >   }).start();
> > }
> >
> > I watched command table and found that compensation command status
> > PENDING -> DONE transition.
> > I also watched txevent table found SagaEnded correctly:
> >
> > In second case, I just remove TxCompensatedEvent sending code:
> >
> > @Override
> > public void onReceive(final String globalTxId, final String localTxId,
> > final String parentTxId, final String compensationMethod,
> >     Object... payloads) {
> >   context.apply(globalTxId, localTxId, compensationMethod, payloads);
> > }
> >
> > In command table compensation command remains PENDING and in txevent
> > table no SagaEndedEvent happens.
> >
> > Zheng Feng <[email protected]> 于2019年7月24日周三 下午4:07写道:
> > >
> > > So the grpc could support the async invoking ?
> > >
> > > Willem Jiang <[email protected]> 于2019年7月24日周三 下午2:31写道:
> > >
> > > > Hi Daniel,
> > > >
> > > > If you take a look at the Omega side,  the CompensationMessageHandler
> > > > is called in onNext() method of GrpcCompensateStreamObserver.
> > > > If there is something wrong with CompensationMessageHandler
> > > > onReceive() method, the Stream could be broken and Alpha can now
> about
> > > > it immediately. (You can write a simple test case to verify it).
> > > >
> > > > Willem Jiang
> > > >
> > > > Twitter: willemjiang
> > > > Weibo: 姜宁willem
> > > >
> > > > On Wed, Jul 24, 2019 at 12:27 PM Daniel Qian <[email protected]>
> > > > wrote:
> > > > >
> > > > > Thanks for sharing your thoughts, Willem. But I didn't really get
> your
> > > > > point.
> > > > >
> > > > > I read the code and found that compensation action works like this:
> > > > >
> > > > > 1. Alpha GrpcOmegaCallback send GrpcCompensateCommand to Omega
> > > > > 2. Omega CompensationMessageHandler handle the command, execute
> > > > > compensation logic and send TxCompensatedEvent to Alpha
> > > > >
> > > > > In this progress, Alpha doesn't wait for compensation success
> message
> > > > > after sending compensation command, it just returns. So the
> progress is
> > > > > already async.
> > > > >
> > > > > If what I said above is correct, please read the following, if not,
> > > > please
> > > > > correct me.
> > > > >
> > > > > As to the current implementation, it's possible that compensation
> command
> > > > > is sent but no compensation success message received. I think what
> you
> > > > > said is Alpha should have a mechanism to handle compensation
> timeout,
> > > > > retry or just log it. But I think this an improvement should be
> done on
> > > > the
> > > > > Alpha side. Am I right?
> > > > >
> > > > > In my opinion, to support async compensation, the only thing need
> to do
> > > > is
> > > > > providing a way to let user code send TxCompensatedEvent. Sure I
> don't
> > > > mean
> > > > > that we should expose the underly communication,
> > > > > GrpcSagaClientMessageSender
> > > > > for example, to user code.  Providing a encapsulated interface
> will be
> > > > > better,
> > > > > just like what I mentioned before.
> > > > >
> > > > > Zheng Feng <[email protected]> 于2019年7月23日周二 下午2:10写道:
> > > > > >
> > > > > > OK, I understand and with my prev experiences with the JTA and
> XA, it
> > > > > looks
> > > > > > similar with the XA.recovery() to get the in-doubt transactions.
> So the
> > > > > > omega side or the application have to keep the records of the
> pending
> > > > > > transactions, is it right ?
> > > > > >
> > > > > > Willem Jiang <[email protected]> 于2019年7月22日周一 下午11:51写道:
> > > > > >
> > > > > > > Yes, We can kick off the recovery process or status verify
> process in
> > > > > > > Alpha if the distribute transaction is in suspend status,  but
> in
> > > > this
> > > > > > > case we need Alpha talk to customer Application to tell if we
> need to
> > > > > > > do the recovery again or just do some auditing there. It
> depends on
> > > > > > > the customer's configuration.
> > > > > > >
> > > > > > > Willem Jiang
> > > > > > >
> > > > > > > Twitter: willemjiang
> > > > > > > Weibo: 姜宁willem
> > > > > > >
> > > > > > > On Mon, Jul 22, 2019 at 11:38 PM Zheng Feng <[email protected]
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > Hi Willem,
> > > > > > > >
> > > > > > > > In the term of providing some further processing extension,
> you
> > > > mean
> > > > > that
> > > > > > > > the customer could help the state machine to recovery from
> the
> > > > > suspension
> > > > > > > > states ?
> > > > > > > >
> > > > > > > > Willem Jiang <[email protected]> 于2019年7月22日周一
> 下午11:28写道:
> > > > > > > >
> > > > > > > > > If we provide the async compensation call, we need to do
> lots of
> > > > > thing
> > > > > > > > > on Alpha and Omega side. My suggestion we just use provide
> sync
> > > > way
> > > > > to
> > > > > > > > > get the feedback of compensation immediately to keep the
> design
> > > > > > > > > simpler.
> > > > > > > > >
> > > > > > > > > As we are reimplementing the Alpha with state machine,
> there are
> > > > > some
> > > > > > > > > suspension state which could be caused by the timeout or
> the
> > > > missing
> > > > > > > > > event message.
> > > > > > > > > I had a long talk with ZhangLei, we are agree that we
> could leave
> > > > > the
> > > > > > > > > status check or recovery to the customer by providing some
> > > > further
> > > > > > > > > processing extension, as there are too many detail things
> in
> > > > > customer
> > > > > > > > > code to think about.
> > > > > > > > >
> > > > > > > > > Anyway, it's my pleasure to have this kind of discussion
> with the
> > > > > > > > > team, it's the beauty of Open Source project development,
> we are
> > > > > > > > > tackling the interesting problem by working together from
> > > > different
> > > > > > > > > company :)
> > > > > > > > >
> > > > > > > > > Willem Jiang
> > > > > > > > >
> > > > > > > > > Twitter: willemjiang
> > > > > > > > > Weibo: 姜宁willem
> > > > > > > > >
> > > > > > > > > On Mon, Jul 22, 2019 at 5:14 PM Zheng Feng <
> [email protected]>
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > In term of the compensation method, I think I had
> discussed
> > > > with
> > > > > > > Willem
> > > > > > > > > > before that it could introduce the @Status annotation
> for the
> > > > > alpha
> > > > > > > > > server
> > > > > > > > > > to query the compensation status. When the compensation
> method
> > > > is
> > > > > > > async
> > > > > > > > > > which means it can not response immediately, it would
> return
> > > > the
> > > > > > > > > > COMPENSATING result and the alpha server could query the
> > > > @Status
> > > > > > > method
> > > > > > > > > to
> > > > > > > > > > check the compensation status, if this method returns
> > > > > COMPENSATE_OK,
> > > > > > > the
> > > > > > > > > > alpha server will mark the local transaction is
> compensated
> > > > > otherwise
> > > > > > > > > will
> > > > > > > > > > mark it with compensate_failed.
> > > > > > > > > >
> > > > > > > > > > Daniel Qian <[email protected]> 于2019年7月21日周日
> 下午8:37写道:
> > > > > > > > > >
> > > > > > > > > > > I rethink the idea I proposed, yes, provide low level
> apis
> > > > is a
> > > > > bad
> > > > > > > > > idea,
> > > > > > > > > > > and I also don't suggest that let user code use
> > > > > omega-xxx-transport
> > > > > > > > > api.
> > > > > > > > > > >
> > > > > > > > > > > I think the essential issues are three:
> > > > > > > > > > >
> > > > > > > > > > >    1. How to pass tx context info across threads
> > > > > > > > > > >    2. How to asynchronously tell Alpha that Saga is
> ended or
> > > > > > > aborted,
> > > > > > > > > which
> > > > > > > > > > >    means not triggered on @SagaStart method returns.
> > > > > > > > > > >    3. How to asynchronously tell Alpha that LocalTx is
> ended
> > > > or
> > > > > > > > > aborted,
> > > > > > > > > > >    which means not triggered on @Compensable method
> returns.
> > > > > > > > > > >
> > > > > > > > > > > I think we can keep using @SagaStart @Compensable for
> the
> > > > > > > > > XXXStartedEvent,
> > > > > > > > > > > and provide a helper to manually end/abort
> Saga/LocalTx.
> > > > Thanks
> > > > > > > for PR
> > > > > > > > > #506
> > > > > > > > > > > (SCB-1385) we can use TransactionContext to achieve
> that.
> > > > Below
> > > > > is
> > > > > > > a
> > > > > > > > > code
> > > > > > > > > > > sample:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > @SagaStart(async=true)
> > > > > > > > > > > public void foo() {
> > > > > > > > > > >    TransactionContext txContext =
> > > > > > > OmegaContext.getTransactionContext();
> > > > > > > > > > >    someAsyncCall()
> > > > > > > > > > >      .onSuccess(Callback() {
> > > > > > > > > > >         omega.endSaga(txContext);
> > > > > > > > > > >      })
> > > > > > > > > > >      .onException(Callback() {
> > > > > > > > > > >         omega.abortSaga(txContext);
> > > > > > > > > > >      })
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > @Compensable(async=true,
> compensationMethod="rollbackBar")
> > > > > > > > > > > public void bar() {
> > > > > > > > > > >    TransactionContext txContext =
> > > > > > > OmegaContext.getTransactionContext();
> > > > > > > > > > >    someAsyncCall()
> > > > > > > > > > >      .onSuccess(Callback() {
> > > > > > > > > > >         omega.endTx(txContext);
> > > > > > > > > > >      })
> > > > > > > > > > >      .onException(Callback() {
> > > > > > > > > > >         omega.abortTx(txContext);
> > > > > > > > > > >      })
> > > > > > > > > > > }
> > > > > > > > > > >
> > > > > > > > > > > The async attribute on @SagaStart and @Compensable
> prevents
> > > > > > > > > Saga/LocalTx
> > > > > > > > > > > ended when method returns.
> > > > > > > > > > > TransactionContext object can be passed around safely
> because
> > > > > it's
> > > > > > > > > > > immutable.
> > > > > > > > > > > What I have not considered clearly is that how to deal
> with
> > > > > > > > > compensation
> > > > > > > > > > > method if it's also async.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Willem Jiang <[email protected]> 于2019年7月20日周六
> > > > 下午10:30写道:
> > > > > > > > > > > >
> > > > > > > > > > > > Yeah, I agree we need provide some low level API for
> the
> > > > user
> > > > > to
> > > > > > > > > pass.
> > > > > > > > > > > > In the recent change of SCB-1386, I introduce the
> > > > > > > TransactionContext
> > > > > > > > > > > > object which holds the reference of GID and LID, we
> may add
> > > > > some
> > > > > > > > > other
> > > > > > > > > > > > transaction context information there too.
> > > > > > > > > > > > If the sub transaction is happened in other JVM, we
> need to
> > > > > pass
> > > > > > > the
> > > > > > > > > > > > TxContext across the JVM with help of
> omega-xxx-transport.
> > > > > > > > > > > >
> > > > > > > > > > > > We already have some internal API to send the
> message from
> > > > > Omega
> > > > > > > to
> > > > > > > > > > > > Alpha, I prefer to use annotation instead of expose
> low
> > > > level
> > > > > > > API to
> > > > > > > > > > > > the user.
> > > > > > > > > > > >
> > > > > > > > > > > > Willem Jiang
> > > > > > > > > > > >
> > > > > > > > > > > > Twitter: willemjiang
> > > > > > > > > > > > Weibo: 姜宁willem
> > > > > > > > > > > >
> > > > > > > > > > > > On Sat, Jul 20, 2019 at 9:50 PM Daniel Qian <
> > > > > > > [email protected]>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > After look into SCB-163, SCB-1385 and SCB-1386 I
> have
> > > > some
> > > > > > > > > thoughts on
> > > > > > > > > > > Saga
> > > > > > > > > > > > > involved in async invocation.
> > > > > > > > > > > > > Current implementation is basically based on sync
> > > > > invocation,
> > > > > > > > > there are
> > > > > > > > > > > > > some assumption:
> > > > > > > > > > > > >
> > > > > > > > > > > > >    1. When @SagaStart method returns,  the Saga
> finished.
> > > > > > > > > > > > >    2. When @Compensable method returns/throws
> exception,
> > > > the
> > > > > > > Local
> > > > > > > > > Tx
> > > > > > > > > > > > >    succeeds/failed.
> > > > > > > > > > > > >    3. When compensationMethod returns, the Local
> Tx is
> > > > > > > compensated.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Even if considering what SCB-100 provided:
> > > > > > > > > > > > >
> > > > > > > > > > > > >    1. Add @OmegaContextAware annotation enabling
> > > > > > > > > > > > >    java.util.concurrent.Executor inject
> OmegaConext into
> > > > > > > threads it
> > > > > > > > > > > > >    manages/spawns
> > > > > > > > > > > > >    2. Make OmegaContext use InheritableThreadLocal
> field
> > > > let
> > > > > > > child
> > > > > > > > > > > thread
> > > > > > > > > > > > >    inherit parent thread's Local Tx info
> > > > > > > > > > > > >
> > > > > > > > > > > > > There are still some limitations:
> > > > > > > > > > > > >
> > > > > > > > > > > > >    1. @OmegaContextAware is only viable if you use
> spring
> > > > > > > framework
> > > > > > > > > > > > >    2. @OmegaContextAware and OmegaContext's
> > > > > > > InheritableThreadLocal
> > > > > > > > > > > field
> > > > > > > > > > > > >    assuming that the calling thread or initator
> thread
> > > > has
> > > > > > > Local Tx
> > > > > > > > > > >  info.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > What if user code use producer-consumer pattern in
> which
> > > > > > > > > > > > > InheritableThreadLocal can't work?
> > > > > > > > > > > > > What if user code use a thread scheduling library
> which
> > > > we
> > > > > > > cannot
> > > > > > > > > use
> > > > > > > > > > > > > @OmegaContextAware,RxJava and Reactor, for example?
> > > > > > > > > > > > > I think we could provide some low-level APIs that
> user
> > > > code
> > > > > can
> > > > > > > > > manualy
> > > > > > > > > > > > > starts/ends Saga and Local Tx, something like
> below:
> > > > > > > > > > > > >
> > > > > > > > > > > > > TxContext context = omega.startSaga();
> > > > > > > > > > > > > TxContext subTxContext = omega.startTx(TxContext
> > > > > > > parentTxContext);
> > > > > > > > > > > > > omega.endTx(TxContext);
> > > > > > > > > > > > > omega.abortTx(TxContext);
> > > > > > > > > > > > > omega.abortSaga(TxContext);
> > > > > > > > > > > > > omega.endSaga(TxContext);
> > > > > > > > > > > > >
> > > > > > > > > > > > > TxContext is just a immutable dto like this:
> > > > > > > > > > > > >
> > > > > > > > > > > > > public class TxContext {
> > > > > > > > > > > > >   private final String globalTxId;
> > > > > > > > > > > > >   private final String localTxId;
> > > > > > > > > > > > > }
> > > > > > > > > > > > >
> > > > > > > > > > > > > Above is a just a rough idea. So any thoughts?
> > > > > > > > > > > > > --
> > > > > > > > > > > > > Daniel Qian
> > > > > > > > > > > > >
> > > > > > > > > > > > > 博客:https://segmentfault.com/u/chanjarster
> > > > > > > > > > > > > github:https://github.com/chanjarster
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Daniel Qian
> > > > > > > > > > >
> > > > > > > > > > > 博客:https://segmentfault.com/u/chanjarster
> > > > > > > > > > > github:https://github.com/chanjarster
> > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Daniel Qian
> > > > >
> > > > > 博客:https://segmentfault.com/u/chanjarster
> > > > > github:https://github.com/chanjarster
> > > >
> >
> >
> >
> > --
> > Daniel Qian
> >
> > 博客:https://segmentfault.com/u/chanjarster
> > github:https://github.com/chanjarster
>


-- 
Daniel Qian

博客:https://segmentfault.com/u/chanjarster
github:https://github.com/chanjarster

Reply via email to