This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 80be074881d218fe2a18e7c1ffb45d021a9e2893 Author: Willem Jiang <[email protected]> AuthorDate: Mon Aug 27 16:13:31 2018 +0800 SCB-817 Update the omega transactions code --- .../transaction/tcc/CoordinateMessageHandler.java | 36 +++++++ .../tcc/TccStartAnnotationProcessor.java | 6 +- .../transaction/tcc/events/CoordinatedEvent.java | 53 ++++++++++ .../tcc/CoordinateMessageHandlerTest.java | 109 +++++++++++++++++++++ .../transaction/tcc/TccParticipatorAspectTest.java | 11 ++- .../tcc/TccStartAnnotationProcessorTest.java | 10 +- .../omega/transaction/tcc/TccStartAspectTest.java | 9 +- 7 files changed, 222 insertions(+), 12 deletions(-) diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java new file mode 100644 index 0000000..7663e72 --- /dev/null +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandler.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.transaction.tcc; + +import org.apache.servicecomb.saga.common.TransactionStatus; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; + +public class CoordinateMessageHandler implements MessageHandler { + + private final TccEventService tccEventService; + + public CoordinateMessageHandler(TccEventService tccEventService) { + this.tccEventService = tccEventService; + } + + @Override + public void onReceive(String globalTxId, String localTxId, String parentTxId, String methodName) { + //TODO Omega Call the service + tccEventService.coordinate(new CoordinatedEvent(globalTxId, localTxId, parentTxId, methodName, TransactionStatus.Succeed)); + } +} diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java index 55198dd..137a397 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java @@ -37,21 +37,21 @@ public class TccStartAnnotationProcessor { public AlphaResponse preIntercept(String parentTxId, String methodName, int timeout) { try { - return eventService.TccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); + return eventService.tccTransactionStart(new TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId())); } catch (OmegaException e) { throw new TransactionalException(e.getMessage(), e.getCause()); } } public void postIntercept(String parentTxId, String methodName) { - eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), + eventService.tccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), TransactionStatus.Succeed)); } public void onError(String parentTxId, String methodName, Throwable throwable) { // Send the cancel event // Do we need to wait for the alpha finish all the transaction - eventService.TccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), + eventService.tccTransactionStop(new TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), TransactionStatus.Failed)); } } diff --git a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java index 6d88924..b770ff3 100644 --- a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java +++ b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/CoordinatedEvent.java @@ -1,4 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.servicecomb.saga.omega.transaction.tcc.events; +import org.apache.servicecomb.saga.common.TransactionStatus; + public class CoordinatedEvent { + private final String globalTxId; + private final String localTxId; + private final String parentTxId; + private final String methodName; + private final TransactionStatus status; + + public CoordinatedEvent(String globalTxId, String localTxId, String parentTxId, String methodName, + TransactionStatus status) { + this.globalTxId = globalTxId; + this.localTxId = localTxId; + this.parentTxId = parentTxId; + this.methodName = methodName; + this.status = status; + } + + public String getGlobalTxId() { + return globalTxId; + } + + public String getLocalTxId() { + return localTxId; + } + + public String getParentTxId() { + return parentTxId; + } + + public String getMethodName() { + return methodName; + } + + public TransactionStatus getStatus() { + return status; + } } diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java new file mode 100644 index 0000000..b260295 --- /dev/null +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/CoordinateMessageHandlerTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.omega.transaction.tcc; + + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.servicecomb.saga.common.TransactionStatus; +import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; +import org.junit.Before; +import org.junit.Test; + +public class CoordinateMessageHandlerTest { + private final List<CoordinatedEvent> coordinatedEvents = new ArrayList<>(); + private final AlphaResponse response = new AlphaResponse(false); + private final TccEventService eventService = new TccEventService() { + @Override + public void onConnected() { + + } + + @Override + public void onDisconnected() { + + } + + @Override + public void close() { + + } + + @Override + public String target() { + return null; + } + + @Override + public AlphaResponse participate(ParticipatedEvent participateEvent) { + return null; + } + + @Override + public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) { + return null; + } + + @Override + public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) { + return null; + } + + @Override + public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) { + coordinatedEvents.add(coordinatedEvent); + return response; + } + }; + + private final String globalTxId = uniquify("globalTxId"); + private final String localTxId = uniquify("localTxId"); + private final String parentTxId = uniquify("parentTxId"); + private final String methodName= uniquify("Method"); + + private final CoordinateMessageHandler handler = new CoordinateMessageHandler(eventService); + + @Before + public void setUp() { + coordinatedEvents.clear(); + } + + @Test + public void sendsCompensatedEventOnCompensationCompleted() { + handler.onReceive(globalTxId, localTxId, parentTxId, methodName); + + assertThat(coordinatedEvents.size(), is(1)); + + CoordinatedEvent event = coordinatedEvents.get(0); + assertThat(event.getGlobalTxId(), is(globalTxId)); + assertThat(event.getLocalTxId(), is(localTxId)); + assertThat(event.getParentTxId(), is(parentTxId)); + assertThat(event.getMethodName(), is(methodName)); + assertThat(event.getStatus(), is(TransactionStatus.Succeed)); + } + +} diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java index 8e6b36e..b3cd806 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspectTest.java @@ -31,10 +31,9 @@ import java.util.UUID; import org.apache.servicecomb.saga.common.TransactionStatus; import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; -import org.apache.servicecomb.saga.omega.context.annotations.TccStart; import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; -import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable; import org.apache.servicecomb.saga.omega.transaction.annotations.Participate; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; @@ -81,15 +80,19 @@ public class TccParticipatorAspectTest { } @Override - public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) { + public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) { return null; } @Override - public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) { + public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) { return null; } + @Override + public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) { + return null; + } }; diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java index 95f8137..fa34ad7 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessorTest.java @@ -33,6 +33,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; import org.apache.servicecomb.saga.omega.transaction.OmegaException; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; @@ -79,7 +80,7 @@ public class TccStartAnnotationProcessorTest { } @Override - public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) { + public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) { if (throwException) { throw exception; } @@ -88,12 +89,15 @@ public class TccStartAnnotationProcessorTest { } @Override - public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) { + public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) { endedEvents.add(tccEndEvent); return response; } - + @Override + public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) { + return null; + } }; private final TccStartAnnotationProcessor tccStartAnnotationProcessor = new TccStartAnnotationProcessor(context, eventService); diff --git a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java index 005af67..80f14e4 100644 --- a/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java +++ b/omega/omega-transaction/src/test/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspectTest.java @@ -32,6 +32,7 @@ import org.apache.servicecomb.saga.omega.context.IdGenerator; import org.apache.servicecomb.saga.omega.context.OmegaContext; import org.apache.servicecomb.saga.omega.context.annotations.TccStart; import org.apache.servicecomb.saga.omega.transaction.AlphaResponse; +import org.apache.servicecomb.saga.omega.transaction.tcc.events.CoordinatedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent; import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent; @@ -75,17 +76,21 @@ public class TccStartAspectTest { } @Override - public AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent) { + public AlphaResponse tccTransactionStart(TccStartedEvent tccStartEvent) { startedEvents.add(tccStartEvent); return response; } @Override - public AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent) { + public AlphaResponse tccTransactionStop(TccEndedEvent tccEndEvent) { endedEvents.add(tccEndEvent); return response; } + @Override + public AlphaResponse coordinate(CoordinatedEvent coordinatedEvent) { + return null; + } };
