WillemJiang closed pull request #268: SCB-856 Implement reaction of the event
in Alpha Server
URL: https://github.com/apache/incubator-servicecomb-saga/pull/268
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
index e45acdd4..d332be20 100644
---
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -23,10 +23,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
-
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.CompositeOmegaCallback;
import org.apache.servicecomb.saga.alpha.core.EventScanner;
@@ -36,6 +34,9 @@
import org.apache.servicecomb.saga.alpha.core.TxConsistentService;
import org.apache.servicecomb.saga.alpha.core.TxEventRepository;
import org.apache.servicecomb.saga.alpha.core.TxTimeoutRepository;
+import org.apache.servicecomb.saga.alpha.server.tcc.GrpcTccEventService;
+import
org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallbackWrapper;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
@@ -86,30 +87,26 @@ ScheduledExecutorService compensationScheduler() {
@Bean
TxConsistentService txConsistentService(
@Value("${alpha.event.pollingInterval:500}") int eventPollingInterval,
- GrpcServerConfig serverConfig,
ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
CommandRepository commandRepository,
TxTimeoutRepository timeoutRepository,
- OmegaCallback omegaCallback,
- Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-
- new EventScanner(scheduler,
- eventRepository, commandRepository, timeoutRepository,
- omegaCallback, eventPollingInterval).run();
-
- TxConsistentService consistentService = new
TxConsistentService(eventRepository);
-
- ServerStartable startable = buildGrpc(serverConfig, consistentService,
omegaCallbacks);
- new Thread(startable::start).start();
-
- return consistentService;
+ OmegaCallback omegaCallback) {
+ new EventScanner(scheduler,
+ eventRepository, commandRepository, timeoutRepository,
+ omegaCallback, eventPollingInterval).run();
+ TxConsistentService consistentService = new
TxConsistentService(eventRepository);
+ return consistentService;
}
- private ServerStartable buildGrpc(GrpcServerConfig serverConfig,
TxConsistentService txConsistentService,
+ @Bean
+ ServerStartable serverStartable(GrpcServerConfig serverConfig,
TxConsistentService txConsistentService,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- return new GrpcStartable(serverConfig,
- new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks));
+ ServerStartable bootstrap = new GrpcStartable(serverConfig,
+ new GrpcTxEventEndpointImpl(txConsistentService, omegaCallbacks),
+ new GrpcTccEventService(new TccCallbackEngine(new
OmegaCallbackWrapper())));
+ new Thread(bootstrap::start).start();
+ return bootstrap;
}
@PostConstruct
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
index 66dd9925..e1368c43 100644
---
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcServerConfig.java
@@ -17,8 +17,8 @@
package org.apache.servicecomb.saga.alpha.server;
-import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
@Configuration
public class GrpcServerConfig {
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
index 4d993748..a5999672 100644
---
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/GrpcStartable.java
@@ -41,12 +41,12 @@
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
-class GrpcStartable implements ServerStartable {
+public class GrpcStartable implements ServerStartable {
private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Server server;
- GrpcStartable(GrpcServerConfig serverConfig, BindableService... services) {
+ public GrpcStartable(GrpcServerConfig serverConfig, BindableService...
services) {
ServerBuilder<?> serverBuilder;
if (serverConfig.isSslEnable()){
serverBuilder = NettyServerBuilder.forAddress(
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
index 33d39df9..41dfdbdd 100644
---
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/ServerStartable.java
@@ -20,6 +20,6 @@
package org.apache.servicecomb.saga.alpha.server;
-interface ServerStartable {
+public interface ServerStartable {
void start();
}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
new file mode 100644
index 00000000..e3b89b99
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc;
+
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.TccCallbackEngine;
+import
org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory;
+import
org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
+import
org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Grpc TCC event service implement.
+ */
+public class GrpcTccEventService extends
TccEventServiceGrpc.TccEventServiceImplBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final GrpcAck ALLOW =
GrpcAck.newBuilder().setAborted(false).build();
+
+ private static final GrpcAck REJECT =
GrpcAck.newBuilder().setAborted(true).build();
+
+ private final TccCallbackEngine tccCallbackEngine;
+
+ public GrpcTccEventService(
+ TccCallbackEngine tccCallbackEngine) {
+ this.tccCallbackEngine = tccCallbackEngine;
+ }
+
+ @Override
+ public void onConnected(GrpcServiceConfig request,
StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+ OmegaCallbacksRegistry.register(request, responseObserver);
+ LOG.info("Established connection service [{}] instanceId [{}].",
request.getServiceName(), request.getInstanceId());
+ }
+
+ @Override
+ public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request,
StreamObserver<GrpcAck> responseObserver) {
+ LOG.info("Received transaction start event, global tx id: {}",
request.getGlobalTxId());
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void participate(GrpcTccParticipatedEvent request,
StreamObserver<GrpcAck> responseObserver) {
+ TransactionEventRegistry.register(ParticipateEventFactory.create(request));
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request,
StreamObserver<GrpcAck> responseObserver) {
+ LOG.info("Received transaction end event, global tx id: {}",
request.getGlobalTxId());
+ responseObserver.onNext(tccCallbackEngine.execute(request) ? ALLOW :
REJECT);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onDisconnected(GrpcServiceConfig request,
StreamObserver<GrpcAck> responseObserver) {
+ OmegaCallback omegaCallback =
OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(),
request.getInstanceId());
+ if (null != omegaCallback) {
+ LOG.info("Disconnect from alpha, service [{}] instanceId [{}].",
request.getServiceName(), request.getInstanceId());
+ omegaCallback.disconnect();
+ }
+ responseObserver.onNext(ALLOW);
+ responseObserver.onCompleted();
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
new file mode 100644
index 00000000..a8521e69
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/CallbackEngine.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
+
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+
+public interface CallbackEngine {
+
+ boolean execute(GrpcTccTransactionEndedEvent request);
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
new file mode 100644
index 00000000..cf8eb8d1
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/GrpcOmegaTccCallback.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
+
+import io.grpc.stub.StreamObserver;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+
+/**
+ * Grpc omega callback for TCC workflow.
+ */
+public final class GrpcOmegaTccCallback implements OmegaCallback {
+
+ private StreamObserver<GrpcTccCoordinateCommand> responseObserver;
+
+ public GrpcOmegaTccCallback(StreamObserver<GrpcTccCoordinateCommand>
responseObserver) {
+ this.responseObserver = responseObserver;
+ }
+
+ @Override
+ public void invoke(ParticipatedEvent event, TransactionStatus status) {
+ GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder()
+ .setGlobalTxId(event.getGlobalTxId())
+ .setLocalTxId(event.getLocalTxId())
+ .setParentTxId(event.getParentTxId() == null ? "" :
event.getParentTxId())
+ .setServiceName(event.getServiceName())
+ .setMethod(TransactionStatus.Succeed.equals(status) ?
event.getConfirmMethod() : event.getCancelMethod())
+ .build();
+ responseObserver.onNext(command);
+ }
+
+ @Override
+ public void disconnect() {
+ responseObserver.onCompleted();
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
new file mode 100644
index 00000000..02ae7a69
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
+
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public interface OmegaCallback {
+
+ void invoke(ParticipatedEvent event, TransactionStatus status);
+
+ default void disconnect() {
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
new file mode 100644
index 00000000..62c183a5
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/OmegaCallbackWrapper.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
+
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import
org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class OmegaCallbackWrapper implements OmegaCallback {
+
+ @Override
+ public void invoke(ParticipatedEvent event, TransactionStatus status) {
+ OmegaCallback omegaCallback =
OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId());
+ try {
+ omegaCallback.invoke(event, status);
+ } catch (Exception ex) {
+ OmegaCallbacksRegistry.removeByValue(event.getServiceName(),
omegaCallback);
+ throw ex;
+ }
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
new file mode 100644
index 00000000..bcdca6df
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/callback/TccCallbackEngine.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.callback;
+
+import java.lang.invoke.MethodHandles;
+import
org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TccCallbackEngine implements CallbackEngine {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final OmegaCallbackWrapper omegaCallbackWrapper;
+
+ public TccCallbackEngine(OmegaCallbackWrapper omegaCallbackWrapper) {
+ this.omegaCallbackWrapper = omegaCallbackWrapper;
+ }
+
+ @Override
+ public boolean execute(GrpcTccTransactionEndedEvent request) {
+ boolean result = true;
+ for (ParticipatedEvent event :
TransactionEventRegistry.retrieve(request.getGlobalTxId())) {
+ try {
+ omegaCallbackWrapper.invoke(event,
TransactionStatus.valueOf(request.getStatus()));
+ } catch (Exception ex) {
+ logError(event, ex);
+ result = false;
+ }
+ }
+ return result;
+ }
+
+ private void logError(ParticipatedEvent event, Exception ex) {
+ LOG.error(
+ "Failed to invoke service [{}] instance [{}] with method [{}], global
tx id [{}] and local tx id [{}]",
+ event.getServiceName(),
+ event.getInstanceId(),
+ TransactionStatus.Succeed.equals(event.getStatus()) ?
event.getConfirmMethod() : event.getCancelMethod(),
+ event.getGlobalTxId(),
+ event.getLocalTxId(),
+ ex);
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
new file mode 100644
index 00000000..7964be5e
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+
+public class ParticipateEventFactory {
+
+ public static ParticipatedEvent create(GrpcTccParticipatedEvent request) {
+ return new ParticipatedEvent(
+ request.getGlobalTxId(),
+ request.getLocalTxId(),
+ request.getParentTxId(),
+ request.getServiceName(),
+ request.getInstanceId(),
+ request.getConfirmMethod(),
+ request.getCancelMethod(),
+ TransactionStatus.valueOf(request.getStatus())
+ );
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
new file mode 100644
index 00000000..40270c25
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.event;
+
+import org.apache.servicecomb.saga.common.TransactionStatus;
+
+public class ParticipatedEvent {
+
+ private String globalTxId;
+ private String localTxId;
+ private String parentTxId;
+ private String serviceName;
+ private String instanceId;
+ private String confirmMethod;
+ private String cancelMethod;
+ private TransactionStatus status;
+
+ public ParticipatedEvent(String globalTxId, String localTxId, String
parentTxId, String serviceName,
+ String instanceId, String confirmMethod, String cancelMethod,
TransactionStatus status) {
+ this.globalTxId = globalTxId;
+ this.localTxId = localTxId;
+ this.parentTxId = parentTxId;
+ this.serviceName = serviceName;
+ this.instanceId = instanceId;
+ this.confirmMethod = confirmMethod;
+ this.cancelMethod = cancelMethod;
+ this.status = status;
+ }
+
+ public String getGlobalTxId() {
+ return globalTxId;
+ }
+
+ public String getLocalTxId() {
+ return localTxId;
+ }
+
+ public String getParentTxId() {
+ return parentTxId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+
+ public String getConfirmMethod() {
+ return confirmMethod;
+ }
+
+ public String getCancelMethod() {
+ return cancelMethod;
+ }
+
+ public TransactionStatus getStatus() {
+ return status;
+ }
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
new file mode 100644
index 00000000..02340b2d
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/OmegaCallbacksRegistry.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.registry;
+
+import static java.util.Collections.emptyMap;
+
+import io.grpc.stub.StreamObserver;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.core.AlphaException;
+import
org.apache.servicecomb.saga.alpha.server.tcc.callback.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.callback.OmegaCallback;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage omega callbacks.
+ */
+public final class OmegaCallbacksRegistry {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new
ConcurrentHashMap<>();
+
+ public static Map<String, Map<String, OmegaCallback>> getRegistry() {
+ return REGISTRY;
+ }
+
+ /**
+ * Register omega TCC callback.
+ *
+ * @param request Grpc service config
+ * @param responseObserver stream observer
+ */
+ public static void register(GrpcServiceConfig request,
StreamObserver<GrpcTccCoordinateCommand> responseObserver) {
+ REGISTRY
+ .computeIfAbsent(request.getServiceName(), key -> new
ConcurrentHashMap<>())
+ .put(request.getInstanceId(), new
GrpcOmegaTccCallback(responseObserver));
+ }
+
+ /**
+ * Retrieve omega TCC callback by service name and instance id.
+ *
+ * @param serviceName service name
+ * @param instanceId instance id
+ * @return Grpc omega TCC callback
+ * @throws AlphaException trigger this exception while missing omega
callback by service name
+ */
+ public static OmegaCallback retrieve(String serviceName, String instanceId)
throws AlphaException {
+ Map<String, OmegaCallback> callbackMap =
REGISTRY.getOrDefault(serviceName, emptyMap());
+ if (callbackMap.isEmpty()) {
+ throw new AlphaException("No such omega callback found for service " +
serviceName);
+ }
+ OmegaCallback result = callbackMap.get(instanceId);
+ if (null == result) {
+ LOG.info("Cannot find the service with the instanceId {}, call the other
instance.", instanceId);
+ return callbackMap.values().iterator().next();
+ }
+ return result;
+ }
+
+ public static void removeByValue(String serviceName, OmegaCallback
omegaCallback) {
+ Map<String, OmegaCallback> callbackMap =
REGISTRY.getOrDefault(serviceName, emptyMap());
+ callbackMap.values().remove(omegaCallback);
+ }
+
+ /**
+ * Retrieve omega TCC callback by service name and instance id, then remove
it from registry.
+ *
+ * @param serviceName service name
+ * @param instanceId instance id
+ * @return Grpc omega TCC callback
+ */
+ public static OmegaCallback retrieveThenRemove(String serviceName, String
instanceId) {
+ return REGISTRY.getOrDefault(serviceName, emptyMap()).remove(instanceId);
+ }
+
+}
diff --git
a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
new file mode 100644
index 00000000..d135a8b4
--- /dev/null
+++
b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/registry/TransactionEventRegistry.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.server.tcc.registry;
+
+import java.lang.invoke.MethodHandles;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage TCC transaction event.
+ */
+public final class TransactionEventRegistry {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final static Map<String, Set<ParticipatedEvent>> REGISTRY = new
ConcurrentHashMap<>();
+
+ /**
+ * Register participate event.
+ *
+ * @param participateEvent participate event
+ */
+ public static void register(ParticipatedEvent participateEvent) {
+ REGISTRY
+ .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new
LinkedHashSet<>())
+ .add(participateEvent);
+
+ LOG.info("Registered participated event, global tx: {}, local tx: {},
parent id: {}, "
+ + "confirm: {}, cancel: {}, status: {}, service [{}] instanceId
[{}]",
+ participateEvent.getGlobalTxId(), participateEvent.getLocalTxId(),
participateEvent.getParentTxId(),
+ participateEvent.getConfirmMethod(),
participateEvent.getCancelMethod(), participateEvent.getStatus(),
+ participateEvent.getServiceName(), participateEvent.getInstanceId());
+ }
+
+ /**
+ * Retrieve participate event from registry.
+ *
+ * @param globalTxId global transaction id
+ * @return participate events
+ */
+ public static Set<ParticipatedEvent> retrieve(String globalTxId) {
+ return REGISTRY.get(globalTxId);
+ }
+}
diff --git
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
index 66e035ba..8f5122b2 100644
---
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
+++
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java
@@ -32,8 +32,10 @@
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
-import java.io.File;
-import java.util.Arrays;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -41,10 +43,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
-
import javax.annotation.PostConstruct;
-import javax.net.ssl.SSLException;
-
import org.apache.servicecomb.saga.alpha.core.CommandRepository;
import org.apache.servicecomb.saga.alpha.core.EventScanner;
import org.apache.servicecomb.saga.alpha.core.OmegaCallback;
@@ -70,15 +69,8 @@
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
-import com.google.protobuf.ByteString;
-
-import io.grpc.ManagedChannel;
-import io.grpc.netty.NettyChannelBuilder;
-import io.grpc.stub.StreamObserver;
-
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {AlphaApplication.class, AlphaConfig.class},
properties = {
diff --git
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
new file mode 100644
index 00000000..a5030b1d
--- /dev/null
+++
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import io.grpc.ManagedChannel;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.servicecomb.saga.alpha.server.AlphaApplication;
+import
org.apache.servicecomb.saga.alpha.server.tcc.callback.GrpcOmegaTccCallback;
+import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent;
+import
org.apache.servicecomb.saga.alpha.server.tcc.registry.OmegaCallbacksRegistry;
+import
org.apache.servicecomb.saga.alpha.server.tcc.registry.TransactionEventRegistry;
+import org.apache.servicecomb.saga.common.TransactionStatus;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent;
+import
org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent;
+import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc;
+import
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub;
+import
org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = {AlphaApplication.class},
+ properties = {
+ "alpha.server.host=0.0.0.0",
+ "alpha.server.port=8090"
+ })
+public class AlphaTccServerTest {
+
+ private static final int port = 8090;
+ protected static ManagedChannel clientChannel;
+
+ private final TccEventServiceStub asyncStub =
TccEventServiceGrpc.newStub(clientChannel);
+
+ private final TccEventServiceBlockingStub blockingStub =
TccEventServiceGrpc.newBlockingStub(clientChannel);
+
+ private final Queue<GrpcTccCoordinateCommand> receivedCommands = new
ConcurrentLinkedQueue<>();
+
+ private final TccCoordinateCommandStreamObserver commandStreamObserver =
+ new
TccCoordinateCommandStreamObserver(this::onReceivedCoordinateCommand,
receivedCommands);
+
+ private final String globalTxId = UUID.randomUUID().toString();
+ private final String localTxId = UUID.randomUUID().toString();
+ private final String parentTxId = UUID.randomUUID().toString();
+ private final String confirmMethod = "confirm";
+ private final String cancelMethod = "cancel";
+
+
+ private final String serviceName = uniquify("serviceName");
+ private final String instanceId = uniquify("instanceId");
+
+ private final GrpcServiceConfig serviceConfig =
GrpcServiceConfig.newBuilder()
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
+ .build();
+
+ @BeforeClass
+ public static void setupClientChannel() {
+ clientChannel = NettyChannelBuilder.forAddress("localhost",
port).usePlaintext().build();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ clientChannel.shutdown();
+ clientChannel = null;
+ }
+
+ @After
+ public void after() {
+ blockingStub.onDisconnected(serviceConfig);
+ }
+
+ @Test
+ public void assertOnConnect() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ assertThat(
+ OmegaCallbacksRegistry.retrieve(serviceName, instanceId),
is(instanceOf(GrpcOmegaTccCallback.class))
+ );
+ }
+
+ @Test
+ public void assertOnDisConnect() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ assertThat(
+ OmegaCallbacksRegistry.retrieve(serviceName, instanceId),
is(instanceOf(GrpcOmegaTccCallback.class))
+ );
+ blockingStub.onDisconnected(serviceConfig);
+ assertThat(commandStreamObserver.isCompleted(), is(true));
+ }
+
+ private void awaitUntilConnected() {
+ await().atMost(2, SECONDS).until(() -> null !=
(OmegaCallbacksRegistry.getRegistry().get(serviceName)));
+ }
+
+ @Test
+ public void assertOnParticipated() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ assertThat(TransactionEventRegistry.retrieve(globalTxId).size(), is(1));
+ ParticipatedEvent event =
TransactionEventRegistry.retrieve(globalTxId).iterator().next();
+ assertThat(event.getGlobalTxId(), is(globalTxId));
+ assertThat(event.getLocalTxId(), is(localTxId));
+ assertThat(event.getInstanceId(), is(instanceId));
+ assertThat(event.getServiceName(), is(serviceName));
+ assertThat(event.getConfirmMethod(), is(confirmMethod));
+ assertThat(event.getCancelMethod(), is(cancelMethod));
+ assertThat(event.getStatus(), is(TransactionStatus.Succeed));
+ }
+
+ @Test
+ public void assertOnTccTransactionSucceedEnded() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+ await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+ assertThat(receivedCommands.size(), is(1));
+ GrpcTccCoordinateCommand command = receivedCommands.poll();
+ assertThat(command.getMethod(), is("confirm"));
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getServiceName(), is(serviceName));
+ }
+
+ @Test
+ public void assertOnTccTransactionFailedEnded() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ blockingStub.onTccTransactionEnded(newTxEnd("Failed"));
+
+ await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+ assertThat(receivedCommands.size(), is(1));
+ GrpcTccCoordinateCommand command = receivedCommands.poll();
+ assertThat(command.getMethod(), is("cancel"));
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getServiceName(), is(serviceName));
+ assertThat(commandStreamObserver.isCompleted(), is(false));
+ }
+
+ @Test
+ public void assertOnCallbackNotExist() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+
+ OmegaCallbacksRegistry.getRegistry().remove(serviceName);
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+ assertThat(result.getAborted(), is(true));
+ }
+
+ @Test
+ public void assertOnCallbacksExecuteError() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ awaitUntilConnected();
+
+ OmegaCallbacksRegistry.getRegistry().get(serviceName).put(instanceId, new
GrpcOmegaTccCallback(null));
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+ assertThat(result.getAborted(), is(true));
+ assertThat(OmegaCallbacksRegistry.getRegistry().get(serviceName).size(),
is(0));
+ }
+
+ @Test
+ public void assertOnSwitchOtherCallbackInstance() {
+ asyncStub.onConnected(serviceConfig, commandStreamObserver);
+ GrpcServiceConfig config = GrpcServiceConfig.newBuilder()
+ .setServiceName(serviceName)
+ .setInstanceId(uniquify("instanceId"))
+ .build();
+ asyncStub.onConnected(config, commandStreamObserver);
+
+ await().atMost(1, SECONDS).until(() ->
(OmegaCallbacksRegistry.getRegistry().get(serviceName) != null));
+ await().atMost(1, SECONDS).until(() ->
(OmegaCallbacksRegistry.getRegistry().get(serviceName).size() == 2));
+
+ OmegaCallbacksRegistry.getRegistry().get(serviceName).remove(instanceId);
+ blockingStub.onTccTransactionStarted(newTxStart());
+ blockingStub.participate(newParticipatedEvent("Succeed"));
+ GrpcAck result = blockingStub.onTccTransactionEnded(newTxEnd("Succeed"));
+
+ await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty());
+ assertThat(receivedCommands.size(), is(1));
+ GrpcTccCoordinateCommand command = receivedCommands.poll();
+ assertThat(command.getMethod(), is("confirm"));
+ assertThat(command.getGlobalTxId(), is(globalTxId));
+ assertThat(command.getServiceName(), is(serviceName));
+
+ assertThat(result.getAborted(), is(false));
+ }
+
+ private GrpcTccParticipatedEvent newParticipatedEvent(String status) {
+ return GrpcTccParticipatedEvent.newBuilder()
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .setServiceName(serviceName)
+ .setInstanceId(instanceId)
+ .setCancelMethod(cancelMethod)
+ .setConfirmMethod(confirmMethod)
+ .setStatus(status)
+ .build();
+ }
+
+ private GrpcTccTransactionStartedEvent newTxStart() {
+ return GrpcTccTransactionStartedEvent.newBuilder()
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .build();
+ }
+
+ private GrpcTccTransactionEndedEvent newTxEnd(String status) {
+ return GrpcTccTransactionEndedEvent.newBuilder()
+ .setGlobalTxId(globalTxId)
+ .setLocalTxId(localTxId)
+ .setStatus(status)
+ .build();
+ }
+
+ private GrpcAck onReceivedCoordinateCommand(GrpcTccCoordinateCommand
command) {
+ return GrpcAck.newBuilder().setAborted(false).build();
+ }
+
+}
diff --git
a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
new file mode 100644
index 00000000..a520d899
--- /dev/null
+++
b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.saga.alpha.tcc.server;
+
+import io.grpc.stub.StreamObserver;
+import java.util.Queue;
+import java.util.function.Consumer;
+import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand;
+
+public class TccCoordinateCommandStreamObserver implements
StreamObserver<GrpcTccCoordinateCommand> {
+
+ private Queue<GrpcTccCoordinateCommand> receivedCommands;
+ private Consumer<GrpcTccCoordinateCommand> consumer;
+
+ public boolean isCompleted() {
+ return completed;
+ }
+
+ private boolean completed = false;
+
+ public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand>
consumer,
+ Queue<GrpcTccCoordinateCommand> receivedCommands) {
+ this.consumer = consumer;
+ this.receivedCommands = receivedCommands;
+ }
+
+ @Override
+ public void onNext(GrpcTccCoordinateCommand value) {
+ consumer.accept(value);
+ receivedCommands.add(value);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ }
+
+ @Override
+ public void onCompleted() {
+ completed = true;
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services