In sync invocation scenario, we can tell tx is finished once the
execution finished, and tx context
can be obtained from thread local implicitly, that's just how we do in
current implementation.

In async invocation scenario, there are various async patterns,
various libs, various tools to do
async, we cannot tell when tx is finished, and tx context can not be
obtained from thread local
implicitly. So, it's user's responsibility to tell when tx finished,
by sending event to alpha for
example. And it's user's responsibility to pass tx context around in
the tx lifecycle.

I know what willem worries about is that user may send malformed event
to alpha that will cause
bugs hard to debug or troubleshoot. I think we can solve this by using
encapsulation.

public class Omega {
  public OmegaClient createClient() {
    if (!omegaContext.autoClose) {
      throw new IllegalStateException("...");
    }
    return new OmegaClient(...);
  }
}
// OmegaClient is immutable and can be safely shared between threads
public class OmegaClient {
  private final String globalTxId;
  private final String parentTxId;
  private final String localTxId;
  private final TxMode mode;
  public void txEnded() {
    if (mode !=  LOCAL_TX) {
      throw new IllegalStateException("...");
    }
    sender.send(TxEndedEvent(...));
  }

  public void txAborted() {
    if (mode != LOCAL_TX) {
      throw ...
    }
    sender.send(...);
  }

  public void sagaEneded() {
    if (mode != SAGA) {
      throw ...
    }
    sender.send(...);
  }

  public void sagaAborted() {
    if (mode != SAGA) {
      throw ...
    }
    sender.send(...);
  }

  public void compensated() {
    if (mode != COMPENSATION) {
      throw ...
    }
    sender.send(...);
  }
}

public enum TxMode {
  SAGA, LOCAL_TX, COMPENSATION
}

For @Compensable method async invocation:

@Compensable(autoClose=false)
public void foo() {
  OmegaClient omegaClient = omega.createClient();
  new Thread(() -> {
    try {
      // do some work
      omegaClient.txEnded();
    } catch(Exception e) {
      omegaClient.txAborted();
    }
  }).start();
}


For @Saga method async invocation, it's user responsibility to tell
when saga finished.
That could be quite challenging but there are nothing we can do for it.

For pub/sub pattern, i recommend to make @Saga method sync and blocking, that'll
be much easier.

@Saga
public void foo() {
  // this method will block until receive a response
  BarResponse resp = messagingSystem.sendAndReceive(BarCmd());
  ...
}

Willem Jiang <willem.ji...@gmail.com> 于2019年7月31日周三 上午8:29写道:
>
> For the first scenario, we can address it with countdown (and timeout)
> check , as each async invocation only have one local-transaction (or
> sub-transaction).
>
> For the second scenario, as the topic publisher and topic subscribers
> are decoupled, it's impossible for Alpha to find out if all the
> local-transactions are finished or not. This issue could be addressed
> by published the domain object event across all the services in a
> decentralized way. Please check out Chris Richardson's microservice
> Saga pattern[1] for more information.
>
> [1]https://microservices.io/patterns/data/saga.html
>
> Willem Jiang
>
> Twitter: willemjiang
> Weibo: 姜宁willem
>
> On Tue, Jul 30, 2019 at 10:56 PM Zhang Lei <zhang_...@boco.com.cn> wrote:
> >
> > Hi, Team
> >
> > For asynchronous, I think we need to distinguish between two scenario
> >
> > 1. After the asynchronous method is executed, the sub-transaction execution 
> > ends, for example: using internal threads to execute.
> > For this scenario, we allow sub-transactions to be in parallel, but we must 
> > wait for all threads to finish before we can end the main method. same 
> > thread join or use CountDownLatch
> >
> > 2. After the asynchronous method is executed, the sub-transaction is still 
> > in execution, for example, PUB / SUB
> > For this scenario, Can I solve it using TransactionContext?
> >
> > Is there something I have missed?
> >
> > Lei Zhang
> >
> > > 在 2019年7月30日,下午3:28,Willem Jiang <willem.ji...@gmail.com> 写道:
> > >
> > > TxAsyncStarted  is just let Alpha know there is an async transaction
> > > invocation, as the local transaction is not started yet, we can only
> > > connect the TxAsyncStarted  event LocalTransactionID with TxStarted
> > > event ParentTransactionID.
> > > Once Alpha receives the TxAsyncStarted event, it should wait for
> > > TxStarted event, In this way Alpha can tell if all the Saga local
> > > transaction are finished.
> > > it could cause some trouble if the async invocation is based in
> > > pub/sub module, it looks like we cannot know if there are multiple
> > > local transactions will be triggered by a simple message. Unless
> > > customer tells us about it through annotation.
> > >
> > > Willem Jiang
> > >
> > > Twitter: willemjiang
> > > Weibo: 姜宁willem
> > >
> > > On Tue, Jul 30, 2019 at 2:49 PM Zheng Feng <zh.f...@gmail.com> wrote:
> > >>
> > >> Hi Willem,
> > >>
> > >> I am not very clear about the TxAsyncStart event ? what's the difference
> > >> between the TxAyncStart and TxStarted Event. How does the alpha process 
> > >> the
> > >> async events ?
> > >>
> > >> Thanks,
> > >> Zheng Feng
> > >>
> > >> Willem Jiang <willem.ji...@gmail.com> 于2019年7月30日周二 上午8:41写道:
> > >>
> > >>> According to the feedback for PR, I think we need to rethink about the
> > >>> SagaEnd implementation in the Async invocation scenario. Normally it's
> > >>> harmless we close the Saga transaction later, but we need to make sure
> > >>> all the ongoing local transaction are finished.
> > >>>
> > >>> In the old way, Alpha can know nothing about the start of Async
> > >>> Componseable transaction if the TxStartedEvent is not sent,  so my
> > >>> propose that we introduce a new event TxAsyncStart to tell Alpha there
> > >>> is new Async invocation, so Alpha can keep tracking this Async
> > >>> invocation event the Componseable transaction is not started yet.  In
> > >>> this way we could block the Async invocation if the Saga transaction
> > >>> is timeout or close the Saga transaction if all the TxAsyncStart
> > >>> transaction is finished.
> > >>>
> > >>> Any thoughts?
> > >>>
> > >>> Willem Jiang
> > >>>
> > >>> Twitter: willemjiang
> > >>> Weibo: 姜宁willem
> > >>>
> > >>> On Mon, Jul 29, 2019 at 5:32 PM Willem Jiang <willem.ji...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>> Export the low level API could introduce some error if the user
> > >>>> doesn't use the API rightly.
> > >>>> My suggestion is we just
> > >>>> BTW, I submit a PR[1] to address this issue in a simple way (But we
> > >>>> still need to tell user what's the right way to configure the
> > >>>> annotation attribute).
> > >>>>
> > >>>> [1]https://github.com/apache/servicecomb-pack/pull/517
> > >>>>
> > >>>> Willem Jiang
> > >>>>
> > >>>> Twitter: willemjiang
> > >>>> Weibo: 姜宁willem
> > >>>>
> > >>>> On Thu, Jul 25, 2019 at 8:44 PM Daniel Qian <chanjars...@gmail.com>
> > >>> wrote:
> > >>>>>
> > >>>>> Hi Zhang Lei,
> > >>>>>
> > >>>>> What I'm trying to say is to provide a way for user to send
> > >>>>> TxEndedEvent, TxAbortedEvent, TxCompensatedEvent, SagaEndedEvent ...
> > >>>>> explicitly on Omega side.
> > >>>>> Because current implementation doesn't support following
> > >>> situation(async):
> > >>>>>
> > >>>>> @Compensable(compensationMethod="rollbackFoo")
> > >>>>> public void foo() {
> > >>>>>  new Thread(() -> /* local tx goes here */).start();
> > >>>>>  // TxEndedEvent sent when returns, it's too early
> > >>>>> }
> > >>>>>
> > >>>>> public void rollbackFoo() {
> > >>>>>  new Thread(() -> /* compensation goes here*/).start();
> > >>>>>  // TxCompensatedEvent sent when returns, it's too early
> > >>>>> }
> > >>>>>
> > >>>>> @SagaStart
> > >>>>> public void bar() {
> > >>>>>  new Thread(() -> /* call other service goes here */).start();
> > >>>>>  // SagaEndedEvent sent when returns, it's too early
> > >>>>> }
> > >>>>>
> > >>>>> I suggest providing a helper class, called omega or something else,
> > >>>>> user can use it to send TxEndedEvent, TxAbortedEvent,
> > >>>>> TxCompensatedEvent, SagaEndedEvent, etc. So the code goes like this:
> > >>>>>
> > >>>>> @Compensable(async=true, compensationMethod="rollbackFoo",
> > >>>>> compensationAsync=true)
> > >>>>> public void foo() {
> > >>>>>  TransactionContext txContext = omegaContext.getTransactionContext();
> > >>>>>  new Thread(() -> {
> > >>>>>    try {
> > >>>>>      /* local tx goes here */
> > >>>>>      omega.txEnded(txContext);
> > >>>>>    } catch(Exception e) {
> > >>>>>      omega.txAborted(txContext);
> > >>>>>    }
> > >>>>>  }).start();
> > >>>>> }
> > >>>>>
> > >>>>> public void rollbackFoo() {
> > >>>>>  TransactionContext txContext = omegaContext.getTransactionContext();
> > >>>>>  new Thread(() -> {
> > >>>>>    /*compensation goes here*/
> > >>>>>    omega.txCompensated()
> > >>>>>  }).start();
> > >>>>> }
> > >>>>>
> > >>>>> @SagaStart(async=true)
> > >>>>> public void bar() {
> > >>>>>  TransactionContext txContext = omegaContext.getTransactionContext();
> > >>>>>  new Thread(() -> {
> > >>>>>    /* call other service goes here */
> > >>>>>    try {
> > >>>>>      omega.sagaEnded(txContext);
> > >>>>>    } catch (Exception e) {
> > >>>>>      omega.sagaAborted(txContext);
> > >>>>>    }
> > >>>>>  }).start();
> > >>>>> }
> > >>>>>
> > >>>>>
> > >>>>> Zhang Lei <zhang_...@boco.com.cn> 于2019年7月25日周四 下午4:46写道:
> > >>>>>
> > >>>>>
> > >>>>> Zhang Lei <zhang_...@boco.com.cn> 于2019年7月25日周四 下午4:46写道:
> > >>>>>>
> > >>>>>> Hi, Daniel Qian
> > >>>>>>
> > >>>>>> Are you talking about the asynchronous problem with the @SagaStart
> > >>> and @Compensable methods on the Omega side? I think this is a typical 
> > >>> long
> > >>> transaction scene.
> > >>>>>>
> > >>>>>> Alpha based on Actor model has implemented asynchronous processing
> > >>> of Omega and Alpha, The event sent to Alpha needs to ensure that all 
> > >>> child
> > >>> transactions have been executed before sending SagaEndedEvent or
> > >>> SagaAbortedEvent.
> > >>>>>>
> > >>>>>> Lei Zhang
> > >>>>>>
> > >>>>>>> 在 2019年7月20日,下午9:49,Daniel Qian <chanjars...@gmail.com> 写道:
> > >>>>>>>
> > >>>>>>> After look into SCB-163, SCB-1385 and SCB-1386 I have some
> > >>> thoughts on Saga
> > >>>>>>> involved in async invocation.
> > >>>>>>> Current implementation is basically based on sync invocation,
> > >>> there are
> > >>>>>>> some assumption:
> > >>>>>>>
> > >>>>>>>  1. When @SagaStart method returns,  the Saga finished.
> > >>>>>>>  2. When @Compensable method returns/throws exception, the Local
> > >>> Tx
> > >>>>>>>  succeeds/failed.
> > >>>>>>>  3. When compensationMethod returns, the Local Tx is compensated.
> > >>>>>>>
> > >>>>>>> Even if considering what SCB-100 provided:
> > >>>>>>>
> > >>>>>>>  1. Add @OmegaContextAware annotation enabling
> > >>>>>>>  java.util.concurrent.Executor inject OmegaConext into threads it
> > >>>>>>>  manages/spawns
> > >>>>>>>  2. Make OmegaContext use InheritableThreadLocal field let child
> > >>> thread
> > >>>>>>>  inherit parent thread's Local Tx info
> > >>>>>>>
> > >>>>>>> There are still some limitations:
> > >>>>>>>
> > >>>>>>>  1. @OmegaContextAware is only viable if you use spring framework
> > >>>>>>>  2. @OmegaContextAware and OmegaContext's InheritableThreadLocal
> > >>> field
> > >>>>>>>  assuming that the calling thread or initator thread has Local
> > >>> Tx  info.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> What if user code use producer-consumer pattern in which
> > >>>>>>> InheritableThreadLocal can't work?
> > >>>>>>> What if user code use a thread scheduling library which we cannot
> > >>> use
> > >>>>>>> @OmegaContextAware,RxJava and Reactor, for example?
> > >>>>>>> I think we could provide some low-level APIs that user code can
> > >>> manualy
> > >>>>>>> starts/ends Saga and Local Tx, something like below:
> > >>>>>>>
> > >>>>>>> TxContext context = omega.startSaga();
> > >>>>>>> TxContext subTxContext = omega.startTx(TxContext parentTxContext);
> > >>>>>>> omega.endTx(TxContext);
> > >>>>>>> omega.abortTx(TxContext);
> > >>>>>>> omega.abortSaga(TxContext);
> > >>>>>>> omega.endSaga(TxContext);
> > >>>>>>>
> > >>>>>>> TxContext is just a immutable dto like this:
> > >>>>>>>
> > >>>>>>> public class TxContext {
> > >>>>>>> private final String globalTxId;
> > >>>>>>> private final String localTxId;
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> Above is a just a rough idea. So any thoughts?
> > >>>>>>> --
> > >>>>>>> Daniel Qian
> > >>>>>>>
> > >>>>>>> 博客:https://segmentfault.com/u/chanjarster
> > >>>>>>> github:https://github.com/chanjarster
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> Daniel Qian
> > >>>>>
> > >>>>> 博客:https://segmentfault.com/u/chanjarster
> > >>>>> github:https://github.com/chanjarster
> > >>>
> >



-- 
Daniel Qian

博客:https://segmentfault.com/u/chanjarster
github:https://github.com/chanjarster

Reply via email to