cherrylzhao closed pull request #262: SCB-865 Implement reaction of the event
in Alpha Server
URL: https://github.com/apache/incubator-servicecomb-saga/pull/262
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml
index c4916eea..75145e5b 100644
--- a/alpha/alpha-server/pom.xml
+++ b/alpha/alpha-server/pom.xml
@@ -200,7 +200,7 @@
</dependencies>
</profile>
<profile>
- <id>spring-boot-2</id>
+ <id>spring-boot-2</id>
<properties>
<spring.boot.version>${spring.boot2.version}</spring.boot.version>
</properties>
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
new file mode 100644
index 00000000..e05b0243
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcOmegaTccCallback.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+/**
+ * Grpc omega callback for TCC workflow.
+ *
+ * @author zhaojun
+ */
+public final class GrpcOmegaTccCallback implements OmegaCallback {
+
+ private StreamObserver<GrpcTccCordinateCommand> responseObserver;
+
+ public GrpcOmegaTccCallback(StreamObserver<GrpcTccCordinateCommand>
responseObserver) {
+ this.responseObserver = responseObserver;
+ }
+
+ @Override
+ public void compensate(ParticipateEvent event, TransactionStatus status) {
+ GrpcTccCordinateCommand command = GrpcTccCordinateCommand.newBuilder()
+ .setGlobalTxId(event.getGlobalTxId())
+ .setLocalTxId(event.getLocalTxId())
+ .setParentTxId(event.getParentTxId() == null ? "" :
event.getParentTxId())
+ .setMethod(TransactionStatus.Succeed.equals(status) ?
event.getConfirmMethod() : event.getCancelMethod())
+ .build();
+ responseObserver.onNext(command);
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
new file mode 100644
index 00000000..a76530b6
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/GrpcTccEventService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import
org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEventFactory;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+
+/**
+ * Grpc TCC event service implement.
+ *
+ * @author zhaojun
+ */
+public class GrpcTccEventService extends
TccEventServiceGrpc.TccEventServiceImplBase {
+
+ private static final GrpcAck ALLOW =
GrpcAck.newBuilder().setAborted(false).build();
+ private static final GrpcAck REJECT =
GrpcAck.newBuilder().setAborted(true).build();
+
+ @Override
+ public void onConnected(GrpcServiceConfig request,
StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+ OmegaCallbacksRegistry.register(request, responseObserver);
+ }
+
+ @Override
+ public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request,
StreamObserver<GrpcAck> responseObserver) {
+ }
+
+ @Override
+ public void participate(GrpcTccParticipateEvent request,
StreamObserver<GrpcAck> responseObserver) {
+ TransactionEventRegistry.register(ParticipateEventFactory.create(request));
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request,
StreamObserver<GrpcAck> responseObserver) {
+ for (ParticipateEvent event :
TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+ OmegaCallbacksRegistry.retrieve(event.getServiceName(),
event.getInstanceId()).compensate(event, event.getStatus());
+ }
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onDisconnected(GrpcServiceConfig request,
StreamObserver<GrpcAck> responseObserver) {
+ OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(),
request.getInstanceId()).disconnect();
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
new file mode 100644
index 00000000..14f8842f
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public interface OmegaCallback {
+
+ void compensate(ParticipateEvent event, TransactionStatus status);
+
+ default void disconnect() {
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
new file mode 100644
index 00000000..c505df11
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/OmegaCallbacksRegistry.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import static java.util.Collections.emptyMap;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand;
+
+/**
+ * Manage Omega callbacks.
+ *
+ * @author zhaojun
+ */
+public final class OmegaCallbacksRegistry {
+
+ private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new
ConcurrentHashMap<>();
+
+ /**
+ * Register omega TCC callback.
+ *
+ * @param request Grpc service config
+ * @param responseObserver stream observer
+ */
+ public static void register(GrpcServiceConfig request,
StreamObserver<GrpcTccCordinateCommand> responseObserver) {
+ REGISTRY
+ .computeIfAbsent(request.getServiceName(), key -> new
ConcurrentHashMap<>())
+ .put(request.getInstanceId(), new
GrpcOmegaTccCallback(responseObserver));
+ }
+
+ /**
+ * Retrieve omega TCC callback by service name and instance id.
+ *
+ * @param serviceName service name
+ * @param instanceId instance id
+ * @return Grpc omega TCC callback
+ */
+ public static OmegaCallback retrieve(String serviceName, String instanceId) {
+ return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId);
+ }
+
+ /**
+ * Retrieve omega TCC callback by service name and instance id, then remove
it from registry.
+ *
+ * @param serviceName service name
+ * @param instanceId instance id
+ * @return Grpc omega TCC callback
+ */
+ public static OmegaCallback retrieveThenRemove(String serviceName, String
instanceId) {
+ return REGISTRY.getOrDefault(serviceName, emptyMap()).remove(instanceId);
+ }
+
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
new file mode 100644
index 00000000..b89967ac
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/TransactionEventRegistry.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.tcc.server.event.ParticipateEvent;
+
+/**
+ * Manage TCC transaction event.
+ *
+ * @author zhaojun
+ */
+public final class TransactionEventRegistry {
+
+ private final static Map<String, List<ParticipateEvent>> REGISTRY = new
ConcurrentHashMap<>();
+
+ /**
+ * Register participate event.
+ *
+ * @param participateEvent participate event
+ */
+ public static void register(ParticipateEvent participateEvent) {
+ REGISTRY
+ .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new
LinkedList<>())
+ .add(participateEvent);
+ }
+
+ /**
+ * Retrieve participate event from registry.
+ *
+ * @param globalTxId global transaction id
+ * @return participate events
+ */
+ public static List<ParticipateEvent> retrieve(String globalTxId) {
+ return REGISTRY.get(globalTxId);
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
new file mode 100644
index 00000000..66182c64
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEvent.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+/**
+ * Participate event.
+ *
+ * @author zhaojun
+ */
+public class ParticipateEvent {
+
+ private String globalTxId;
+ private String localTxId;
+ private String parentTxId;
+ private String serviceName;
+ private String instanceId;
+ private String confirmMethod;
+ private String cancelMethod;
+ private TransactionStatus status;
+
+ public ParticipateEvent(String globalTxId, String localTxId, String
parentTxId, String serviceName,
+ String instanceId, String confirmMethod, String cancelMethod,
TransactionStatus status) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ this.parentTxId = parentTxId;
+ this.serviceName = serviceName;
+ this.instanceId = instanceId;
+ this.confirmMethod = confirmMethod;
+ this.cancelMethod = cancelMethod;
+ this.status = status;
+ }
+
+ public String getGlobalTxId() {
+ return globalTxId;
+ }
+
+ public void setGlobalTxId(String globalTxId) {
+ this.globalTxId = globalTxId;
+ }
+
+ public String getLocalTxId() {
+ return localTxId;
+ }
+
+ public void setLocalTxId(String localTxId) {
+ this.localTxId = localTxId;
+ }
+
+ public String getParentTxId() {
+ return parentTxId;
+ }
+
+ public void setParentTxId(String parentTxId) {
+ this.parentTxId = parentTxId;
+ }
+
+ public String getConfirmMethod() {
+ return confirmMethod;
+ }
+
+ public void setConfirmMethod(String confirmMethod) {
+ this.confirmMethod = confirmMethod;
+ }
+
+ public String getCancelMethod() {
+ return cancelMethod;
+ }
+
+ public void setCancelMethod(String cancelMethod) {
+ this.cancelMethod = cancelMethod;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public void setInstanceId(String instanceId) {
+ this.instanceId = instanceId;
+ }
+
+ public TransactionStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TransactionStatus status) {
+ this.status = status;
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
new file mode 100644
index 00000000..d876acf6
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/tcc/server/event/ParticipateEventFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipateEvent;
+
+/**
+ * Participate event factory.
+ *
+ * @author zhaojun
+ */
+public class ParticipateEventFactory {
+
+ public static ParticipateEvent create(GrpcTccParticipateEvent request) {
+ return new ParticipateEvent(
+ request.getGlobalTxId(),
+ request.getLocalTxId(),
+ request.getParentTxId(),
+ request.getConfirmMethod(),
+ request.getCancelMethod(),
+ request.getServiceName(),
+ request.getInstanceId(),
+ TransactionStatus.valueOf(request.getStatus())
+ );
+ }
+}
diff --git
a/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
new file mode 100644
index 00000000..6c8afd2d
--- /dev/null
+++
b/omega/omega-context/src/main/java/org/apache/servicecomb/saga/omega/context/annotations/TccStart.java
@@ -0,0 +1,22 @@
+package org.apache.servicecomb.saga.omega.context.annotations;
+
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates the annotated method will start a TCC .
+ */
+@Retention(RUNTIME)
+@Target(METHOD)
+public @interface TccStart {
+ /**
+ * TCC timeout, in seconds. <br>
+ * Default value is 0, which means never timeout.
+ *
+ * @return
+ */
+ int timeout() default 0;
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Participate.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Participate.java
new file mode 100644
index 00000000..83510762
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/annotations/Participate.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD})
+/**
+ * Indicates the annotated method will start a sub-transaction. <br>
+ * A <code>@Participate</code> method should satisfy below requirements:
+ * <ol>
+ * <li>all parameters are serialized</li>
+ * <li>is idempotent</li>
+ * <li>the object instance which @Participate method resides in should be
stateless</li>
+ * </ol>
+ */
+public @interface Participate {
+ /**
+ * Confirm method name.<br>
+ * A confirm method should satisfy below requirements:
+ * <ol>
+ * <li>has same parameter list as @Participate method's</li>
+ * <li>all parameters are serialized</li>
+ * <li>is idempotent</li>
+ * <li>be in the same class as @Participate method is in</li>
+ * </ol>
+ *
+ * @return
+ */
+ String confirmMethod() default "";
+
+ /**
+ * Cancel method name.<br>
+ * A cancel method should satisfy below requirements:
+ * <ol>
+ * <li>has same parameter list as @Participate method's</li>
+ * <li>all parameters are serialized</li>
+ * <li>is idempotent</li>
+ * <li>be in the same class as @Participate method is in</li>
+ * </ol>
+ *
+ * @return
+ */
+ String cancelMethod() default "";
+
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
new file mode 100644
index 00000000..ae011bca
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccEventService.java
@@ -0,0 +1,27 @@
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.TxEvent;
+import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+public interface TccEventService {
+
+ void onConnected();
+
+ void onDisconnected();
+
+ void close();
+
+ String target();
+
+ AlphaResponse participate(ParticipatedEvent participateEvent);
+
+ AlphaResponse TccTransactionStart(TccStartedEvent tccStartEvent);
+
+ AlphaResponse TccTransactionStop(TccEndedEvent tccEndEvent);
+
+ AlphaResponse send(TxEvent event);
+
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
new file mode 100644
index 00000000..e64bc2a1
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccParticipatorAspect.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicy;
+import org.apache.servicecomb.saga.omega.transaction.RecoveryPolicyFactory;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Compensable;
+import org.apache.servicecomb.saga.omega.transaction.annotations.Participate;
+import
org.apache.servicecomb.saga.omega.transaction.tcc.events.ParticipatedEvent;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class TccParticipatorAspect {
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OmegaContext context;
+ private final TccEventService tccEventService;
+
+ public TccParticipatorAspect(TccEventService tccEventService, OmegaContext
context) {
+ this.context = context;
+ this.tccEventService = tccEventService;
+ }
+
+
@Around("execution(@org.apache.servicecomb.saga.omega.transaction.annotations.Participate
* *(..)) && @annotation(participate)")
+ Object advise(ProceedingJoinPoint joinPoint, Participate participate) throws
Throwable {
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+ String localTxId = context.localTxId();
+ String cancelMethod = participate.cancelMethod();
+ String confirmMethod = participate.confirmMethod();
+
+ context.newLocalTxId();
+ LOG.debug("Updated context {} for participate method {} ", context,
method.toString());
+
+ try {
+ Object result = joinPoint.proceed();
+ // Send the participate message back
+ tccEventService.participate(new ParticipatedEvent(context.globalTxId(),
context.localTxId(), localTxId, cancelMethod, confirmMethod,
+ TransactionStatus.Succeed));
+ LOG.debug("Participate Transaction with context {} has finished.",
context);
+ return result;
+ } catch (Throwable throwable) {
+ // Now we don't handle the error message
+ tccEventService.participate(new ParticipatedEvent(context.globalTxId(),
context.localTxId(), localTxId, cancelMethod,
+ confirmMethod, TransactionStatus.Failed));
+ LOG.error("Participate Transaction with context {} failed.", context,
throwable);
+ throw throwable;
+ } finally {
+ context.setLocalTxId(localTxId);
+ }
+ }
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
new file mode 100644
index 00000000..26621d13
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAnnotationProcessor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import javax.transaction.TransactionalException;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.transaction.AlphaResponse;
+import org.apache.servicecomb.saga.omega.transaction.EventAwareInterceptor;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.apache.servicecomb.saga.omega.transaction.TxAbortedEvent;
+import org.apache.servicecomb.saga.omega.transaction.tcc.events.TccEndedEvent;
+import
org.apache.servicecomb.saga.omega.transaction.tcc.events.TccStartedEvent;
+
+public class TccStartAnnotationProcessor implements EventAwareInterceptor {
+
+ private final OmegaContext omegaContext;
+ private final TccEventService eventService;
+
+ TccStartAnnotationProcessor(OmegaContext omegaContext, TccEventService
eventService) {
+ this.omegaContext = omegaContext;
+ this.eventService = eventService;
+ }
+
+ @Override
+ public AlphaResponse preIntercept(String parentTxId, String
compensationMethod, int timeout, String retriesMethod,
+ int retries, Object... message) {
+ try {
+ return eventService.TccTransactionStart(new
TccStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId()));
+ } catch (OmegaException e) {
+ throw new TransactionalException(e.getMessage(), e.getCause());
+ }
+ }
+
+ @Override
+ public void postIntercept(String parentTxId, String compensationMethod) {
+ eventService.TccTransactionStop(new
TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ TransactionStatus.Succeed));
+ }
+
+ @Override
+ public void onError(String parentTxId, String compensationMethod, Throwable
throwable) {
+ // Send the cancel event
+ // Do we need to wait for the alpha finish all the transaction
+ eventService.TccTransactionStop(new
TccEndedEvent(omegaContext.globalTxId(), omegaContext.localTxId(),
+ TransactionStatus.Failed));
+ }
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
new file mode 100644
index 00000000..f6d1d77f
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/TccStartAspect.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Method;
+
+import org.apache.servicecomb.saga.omega.context.OmegaContext;
+import org.apache.servicecomb.saga.omega.context.annotations.SagaStart;
+import org.apache.servicecomb.saga.omega.context.annotations.TccStart;
+import org.apache.servicecomb.saga.omega.transaction.MessageSender;
+import org.apache.servicecomb.saga.omega.transaction.OmegaException;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Aspect
+public class TccStartAspect {
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final TccStartAnnotationProcessor tccStartAnnotationProcessor;
+
+ private final OmegaContext context;
+
+ public TccStartAspect(TccEventService tccEventServicer, OmegaContext
context) {
+ this.context = context;
+ this.tccStartAnnotationProcessor = new
TccStartAnnotationProcessor(context, tccEventServicer);
+ }
+
+
@Around("execution(@org.apache.servicecomb.saga.omega.context.annotations.TccStart
* *(..)) && @annotation(tccStart)")
+ Object advise(ProceedingJoinPoint joinPoint, TccStart tccStart) throws
Throwable {
+ initializeOmegaContext();
+ Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
+
+ tccStartAnnotationProcessor.preIntercept(context.globalTxId(),
method.toString(), tccStart.timeout(), "", 0);
+ LOG.debug("Initialized context {} before execution of method {}", context,
method.toString());
+
+ try {
+ Object result = joinPoint.proceed();
+
+ tccStartAnnotationProcessor.postIntercept(context.globalTxId(),
method.toString());
+ LOG.debug("Transaction with context {} has finished.", context);
+
+ return result;
+ } catch (Throwable throwable) {
+ // We don't need to handle the OmegaException here
+ if (!(throwable instanceof OmegaException)) {
+ tccStartAnnotationProcessor.onError(context.globalTxId(),
method.toString(), throwable);
+ LOG.error("Transaction {} failed.", context.globalTxId());
+ }
+ throw throwable;
+ } finally {
+ context.clear();
+ }
+ }
+
+ private void initializeOmegaContext() {
+ context.setLocalTxId(context.newGlobalTxId());
+ }
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
new file mode 100644
index 00000000..3372f8e0
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/ParticipatedEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class ParticipatedEvent {
+
+ private final String globalTxId;
+ private final String localTxId;
+ private final String parentTxId;
+ private final String confirmMethod;
+ private final String cancelMethod;
+ private final TransactionStatus status;
+
+
+ public ParticipatedEvent(String globalTxId, String localTxId, String
parentTxId, String confirmMethod,
+ String cancelMethod, TransactionStatus status) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ this.parentTxId = parentTxId;
+ this.confirmMethod = confirmMethod;
+ this.cancelMethod = cancelMethod;
+ this.status = status;
+ }
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
new file mode 100644
index 00000000..7c666b2b
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccEndedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class TccEndedEvent {
+ private final String globalTxId;
+ private final String localTxId;
+ private final TransactionStatus status;
+
+
+ public TccEndedEvent(String globalTxId, String localTxId,
+ TransactionStatus status) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ this.status = status;
+ }
+}
diff --git
a/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
new file mode 100644
index 00000000..edd0333f
--- /dev/null
+++
b/omega/omega-transaction/src/main/java/org/apache/servicecomb/saga/omega/transaction/tcc/events/TccStartedEvent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.saga.omega.transaction.tcc.events;
+
+public class TccStartedEvent {
+ private final String globalTxId;
+ private final String localTxId;
+
+
+ public String getGlobalTxId() {
+ return globalTxId;
+ }
+
+ public String getLocalTxId() {
+ return localTxId;
+ }
+
+
+
+ public TccStartedEvent(String globalTxId, String localTxId) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ }
+}
diff --git
a/pack-common/src/main/java/org/apache/servicecomb/saga/common/TransactionStatus.java
b/pack-common/src/main/java/org/apache/servicecomb/saga/common/TransactionStatus.java
new file mode 100644
index 00000000..ea5a6624
--- /dev/null
+++
b/pack-common/src/main/java/org/apache/servicecomb/saga/common/TransactionStatus.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.common;
+
+public enum TransactionStatus {
+ Succeed,
+ Failed
+}
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcCommon.proto
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcCommon.proto
new file mode 100644
index 00000000..40a8db7e
--- /dev/null
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcCommon.proto
@@ -0,0 +1,32 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
+
+// Define the common structs which could be use for TccEventService and
SagaEventService
+
+message GrpcServiceConfig {
+ string serviceName = 1;
+ string instanceId = 2;
+}
+
+message GrpcAck {
+ bool aborted = 1;
+}
diff --git
a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
new file mode 100644
index 00000000..50fb0a80
--- /dev/null
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTccEvent.proto
@@ -0,0 +1,69 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
+option java_outer_classname = "TccEventProto";
+
+import "GrpcCommon.proto";
+
+service TccEventService {
+ rpc OnConnected (GrpcServiceConfig) returns (stream GrpcTccCordinateCommand)
{
+ }
+ rpc participate(GrpcTccParticipateEvent) returns (GrpcAck) {}
+ rpc OnTccTransactionStarted (GrpcTccTransactionStartedEvent) returns
(GrpcAck) {}
+ rpc OnTccTransactionEnded (GrpcTccTransactionEndedEvent) returns (GrpcAck) {}
+ rpc OnDisconnected (GrpcServiceConfig) returns (GrpcAck) {
+ }
+}
+
+message GrpcTccTransactionStartedEvent {
+ int64 timestamp = 1;
+ string globalTxId = 2;
+ string localTxId = 3;
+ string parentTxId = 4;
+}
+
+message GrpcTccParticipateEvent {
+ int64 timestamp = 1;
+ string globalTxId = 2;
+ string localTxId = 3;
+ string parentTxId = 4;
+ string serviceName = 5;
+ string instanceId = 6;
+ string confirmMethod = 7;
+ string cancelMethod = 8;
+ string status = 9;
+}
+
+message GrpcTccTransactionEndedEvent {
+ int64 timestamp = 1;
+ string globalTxId = 2;
+ string localTxId = 3;
+ string parentTxId = 4;
+ string status = 5;
+}
+
+message GrpcTccCordinateCommand {
+ string globalTxId = 1;
+ string localTxId = 2;
+ string parentTxId = 3;
+ string method = 4;
+}
+
diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
index d2c6f77c..f037b4fd 100644
--- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
+++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto
@@ -21,6 +21,8 @@ option java_multiple_files = true;
option java_package = "org.apache.servicecomb.saga.pack.contract.grpc";
option java_outer_classname = "TxEventProto";
+import "GrpcCommon.proto";
+
service TxEventService {
rpc OnConnected (GrpcServiceConfig) returns (stream GrpcCompensateCommand) {
}
@@ -29,15 +31,6 @@ service TxEventService {
}
}
-message GrpcServiceConfig {
- string serviceName = 1;
- string instanceId = 2;
-}
-
-message GrpcAck {
- bool aborted = 1;
-}
-
message GrpcTxEvent {
int64 timestamp = 1;
string globalTxId = 2;
diff --git
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
index f2869b3d..3103a8a7 100644
---
a/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
+++
b/saga-core/src/main/java/org/apache/servicecomb/saga/core/dag/GraphCycleDetectorImpl.java
@@ -21,10 +21,8 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* Cycle detection is based on topological sort with Kahn's algorithm.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services