This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 3f7377693c768f563cac5f1ff8211f5bc05057d3 Author: cherrylzhao <[email protected]> AuthorDate: Fri Aug 24 17:53:26 2018 +0800 SCB-856 Add junit test case & resolve related bugs. --- .../alpha/server/tcc/GrpcOmegaTccCallback.java | 10 +- .../saga/alpha/server/tcc/GrpcTccEventService.java | 19 +++- .../saga/alpha/server/tcc/OmegaCallback.java | 3 +- .../alpha/server/tcc/OmegaCallbacksRegistry.java | 18 +++- .../alpha/server/tcc/TransactionEventRegistry.java | 10 +- .../server/tcc/event/ParticipateEventFactory.java | 4 +- .../alpha/server/tcc/event/ParticipatedEvent.java | 44 ++------- .../saga/alpha/server/AlphaIntegrationTest.java | 3 +- .../alpha/server/AlphaIntegrationWithSSLTest.java | 3 +- .../saga/alpha/tcc/server/AlphaTccServerTest.java | 110 +++++++++++++++++++-- .../TccCoordinateCommandStreamObserver.java | 9 +- 11 files changed, 164 insertions(+), 69 deletions(-) diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java index e5364b0..8ea7cfb 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcOmegaTccCallback.java @@ -34,13 +34,19 @@ public final class GrpcOmegaTccCallback implements OmegaCallback { } @Override - public void compensate(ParticipatedEvent event, TransactionStatus status) { + public void invoke(ParticipatedEvent event, String status) { GrpcTccCoordinateCommand command = GrpcTccCoordinateCommand.newBuilder() .setGlobalTxId(event.getGlobalTxId()) .setLocalTxId(event.getLocalTxId()) .setParentTxId(event.getParentTxId() == null ? "" : event.getParentTxId()) - .setMethod(TransactionStatus.Succeed.equals(status) ? event.getConfirmMethod() : event.getCancelMethod()) + .setServiceName(event.getServiceName()) + .setMethod("Succeed".equals(status) ? event.getConfirmMethod() : event.getCancelMethod()) .build(); responseObserver.onNext(command); } + + @Override + public void disconnect() { + responseObserver.onCompleted(); + } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java index 148a0e9..cd61162 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/GrpcTccEventService.java @@ -18,8 +18,9 @@ package org.apache.servicecomb.saga.alpha.server.tcc; import io.grpc.stub.StreamObserver; -import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; +import org.apache.servicecomb.saga.alpha.core.AlphaException; import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipateEventFactory; +import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; @@ -43,6 +44,8 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl @Override public void onTccTransactionStarted(GrpcTccTransactionStartedEvent request, StreamObserver<GrpcAck> responseObserver) { + responseObserver.onNext(ALLOW); + responseObserver.onCompleted(); } @Override @@ -54,8 +57,13 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl @Override public void onTccTransactionEnded(GrpcTccTransactionEndedEvent request, StreamObserver<GrpcAck> responseObserver) { - for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) { - OmegaCallbacksRegistry.retrieve(event.getServiceName(), event.getInstanceId()).compensate(event, event.getStatus()); + try { + for (ParticipatedEvent event : TransactionEventRegistry.retrieve(request.getGlobalTxId())) { + OmegaCallbacksRegistry.retrieve(event.getServiceName(), + event.getInstanceId()).invoke(event, request.getStatus()); + } + } catch (AlphaException ex) { + responseObserver.onNext(REJECT); } responseObserver.onNext(ALLOW); responseObserver.onCompleted(); @@ -63,7 +71,10 @@ public class GrpcTccEventService extends TccEventServiceGrpc.TccEventServiceImpl @Override public void onDisconnected(GrpcServiceConfig request, StreamObserver<GrpcAck> responseObserver) { - OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()).disconnect(); + OmegaCallback omegaCallback = OmegaCallbacksRegistry.retrieveThenRemove(request.getServiceName(), request.getInstanceId()); + if (null != omegaCallback) { + omegaCallback.disconnect(); + } responseObserver.onNext(ALLOW); responseObserver.onCompleted(); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java index 3c19cbb..369472c 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallback.java @@ -18,11 +18,10 @@ package org.apache.servicecomb.saga.alpha.server.tcc; import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; -import org.apache.servicecomb.saga.common.TransactionStatus; public interface OmegaCallback { - void compensate(ParticipatedEvent event, TransactionStatus status); + void invoke(ParticipatedEvent event, String status); default void disconnect() { } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java index ef075a5..834a5a2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/OmegaCallbacksRegistry.java @@ -22,6 +22,7 @@ import static java.util.Collections.emptyMap; import io.grpc.stub.StreamObserver; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.servicecomb.saga.alpha.core.AlphaException; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; @@ -32,6 +33,10 @@ public final class OmegaCallbacksRegistry { private final static Map<String, Map<String, OmegaCallback>> REGISTRY = new ConcurrentHashMap<>(); + public static Map<String, Map<String, OmegaCallback>> getRegistry() { + return REGISTRY; + } + /** * Register omega TCC callback. * @@ -50,9 +55,18 @@ public final class OmegaCallbacksRegistry { * @param serviceName service name * @param instanceId instance id * @return Grpc omega TCC callback + * @throws AlphaException trigger this exception while missing omega callback by service name */ - public static OmegaCallback retrieve(String serviceName, String instanceId) { - return REGISTRY.getOrDefault(serviceName, emptyMap()).get(instanceId); + public static OmegaCallback retrieve(String serviceName, String instanceId) throws AlphaException { + Map<String, OmegaCallback> callbackMap = REGISTRY.getOrDefault(serviceName, emptyMap()); + if (callbackMap.isEmpty()) { + throw new AlphaException("No such omega callback found for service " + serviceName); + } + OmegaCallback result = callbackMap.get(instanceId); + if (null == result) { + return callbackMap.values().iterator().next(); + } + return result; } /** diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java index a2e3ddc..6218304 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/TransactionEventRegistry.java @@ -17,9 +17,9 @@ package org.apache.servicecomb.saga.alpha.server.tcc; -import java.util.LinkedList; -import java.util.List; +import java.util.LinkedHashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; @@ -28,7 +28,7 @@ import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; */ public final class TransactionEventRegistry { - private final static Map<String, List<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>(); + private final static Map<String, Set<ParticipatedEvent>> REGISTRY = new ConcurrentHashMap<>(); /** * Register participate event. @@ -37,7 +37,7 @@ public final class TransactionEventRegistry { */ public static void register(ParticipatedEvent participateEvent) { REGISTRY - .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedList<>()) + .computeIfAbsent(participateEvent.getGlobalTxId(), key -> new LinkedHashSet<>()) .add(participateEvent); } @@ -47,7 +47,7 @@ public final class TransactionEventRegistry { * @param globalTxId global transaction id * @return participate events */ - public static List<ParticipatedEvent> retrieve(String globalTxId) { + public static Set<ParticipatedEvent> retrieve(String globalTxId) { return REGISTRY.get(globalTxId); } } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java index 3c7523f..7964be5 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipateEventFactory.java @@ -27,10 +27,10 @@ public class ParticipateEventFactory { request.getGlobalTxId(), request.getLocalTxId(), request.getParentTxId(), - request.getConfirmMethod(), - request.getCancelMethod(), request.getServiceName(), request.getInstanceId(), + request.getConfirmMethod(), + request.getCancelMethod(), TransactionStatus.valueOf(request.getStatus()) ); } diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java index 67c84ac..40270c2 100644 --- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java +++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/saga/alpha/server/tcc/event/ParticipatedEvent.java @@ -46,63 +46,31 @@ public class ParticipatedEvent { return globalTxId; } - public void setGlobalTxId(String globalTxId) { - this.globalTxId = globalTxId; - } - public String getLocalTxId() { return localTxId; } - public void setLocalTxId(String localTxId) { - this.localTxId = localTxId; - } - public String getParentTxId() { return parentTxId; } - public void setParentTxId(String parentTxId) { - this.parentTxId = parentTxId; - } - - public String getConfirmMethod() { - return confirmMethod; - } - - public void setConfirmMethod(String confirmMethod) { - this.confirmMethod = confirmMethod; - } - - public String getCancelMethod() { - return cancelMethod; - } - - public void setCancelMethod(String cancelMethod) { - this.cancelMethod = cancelMethod; - } - public String getServiceName() { return serviceName; } - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - public String getInstanceId() { return instanceId; } - public void setInstanceId(String instanceId) { - this.instanceId = instanceId; + public String getConfirmMethod() { + return confirmMethod; } - public TransactionStatus getStatus() { - return status; + public String getCancelMethod() { + return cancelMethod; } - public void setStatus(TransactionStatus status) { - this.status = status; + public TransactionStatus getStatus() { + return status; } } diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 66e035b..1af6b05 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -84,7 +84,8 @@ import io.grpc.stub.StreamObserver; properties = { "alpha.server.host=0.0.0.0", "alpha.server.port=8090", - "alpha.event.pollingInterval=1" + "alpha.event.pollingInterval=1", + "alpha.mode=SAGA" }) public class AlphaIntegrationTest { private static final int port = 8090; diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java index 8a2df82..e14775c 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/server/AlphaIntegrationWithSSLTest.java @@ -39,7 +39,8 @@ import io.netty.handler.ssl.SslProvider; properties = { "alpha.server.host=0.0.0.0", "alpha.server.port=8092", - "alpha.event.pollingInterval=1" + "alpha.event.pollingInterval=1", + "alpha.mode=SAGA" }) public class AlphaIntegrationWithSSLTest extends AlphaIntegrationTest { private static final int port = 8092; diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java index d421075..102141e 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java @@ -18,6 +18,11 @@ package org.apache.servicecomb.saga.alpha.tcc.server; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; @@ -25,10 +30,17 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.servicecomb.saga.alpha.server.AlphaApplication; -import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver; +import org.apache.servicecomb.saga.alpha.server.tcc.GrpcOmegaTccCallback; +import org.apache.servicecomb.saga.alpha.server.tcc.OmegaCallbacksRegistry; +import org.apache.servicecomb.saga.alpha.server.tcc.TransactionEventRegistry; +import org.apache.servicecomb.saga.alpha.server.tcc.event.ParticipatedEvent; +import org.apache.servicecomb.saga.common.TransactionStatus; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccParticipatedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionEndedEvent; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccTransactionStartedEvent; import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc; import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub; import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub; @@ -45,20 +57,19 @@ import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = {AlphaApplication.class}, properties = { "alpha.server.host=0.0.0.0", - "alpha.server.tcc-port=8090", - "alpha.event.pollingInterval=1", + "alpha.server.tcc-port=8190", "alpha.mode=TCC" }) public class AlphaTccServerTest { - private static final int port = 8090; + private static final int port = 8190; protected static ManagedChannel clientChannel; private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel); private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel); - private static final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); + private final Queue<GrpcTccCoordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); private final TccCoordinateCommandStreamObserver commandStreamObserver = new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands); @@ -66,7 +77,9 @@ public class AlphaTccServerTest { private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); - private final String compensationMethod = getClass().getCanonicalName(); + private final String confirmMethod = "confirm"; + private final String cancelMethod = "cancel"; + private final String serviceName = uniquify("serviceName"); private final String instanceId = uniquify("instanceId"); @@ -94,12 +107,95 @@ public class AlphaTccServerTest { @After public void after() { -// blockingStub.onDisconnected(serviceConfig); + blockingStub.onDisconnected(serviceConfig); } @Test public void assertOnConnect() { asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + assertThat( + OmegaCallbacksRegistry.retrieve(serviceName, instanceId), is(instanceOf(GrpcOmegaTccCallback.class)) + ); + } + + private void awaitUntilConnected() { + await().atMost(2, SECONDS).until(() -> null != (OmegaCallbacksRegistry.getRegistry().get(serviceName))); + } + + @Test + public void assertOnParticipated() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + blockingStub.participate(newParticipatedEvent("Succeed")); + assertThat(TransactionEventRegistry.retrieve(globalTxId).size(), is(1)); + ParticipatedEvent event = TransactionEventRegistry.retrieve(globalTxId).iterator().next(); + assertThat(event.getGlobalTxId(), is(globalTxId)); + assertThat(event.getLocalTxId(), is(localTxId)); + assertThat(event.getInstanceId(), is(instanceId)); + assertThat(event.getServiceName(), is(serviceName)); + assertThat(event.getConfirmMethod(), is(confirmMethod)); + assertThat(event.getCancelMethod(), is(cancelMethod)); + assertThat(event.getStatus(), is(TransactionStatus.Succeed)); + } + + @Test + public void assertOnTccTransactionSucceedEnded() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + blockingStub.onTccTransactionStarted(newTxStart()); + blockingStub.participate(newParticipatedEvent("Succeed")); + blockingStub.onTccTransactionEnded(newTxEnd("Succeed")); + + await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty()); + assertThat(receivedCommands.size(), is(1)); + GrpcTccCoordinateCommand command = receivedCommands.poll(); + assertThat(command.getMethod(), is("confirm")); + assertThat(command.getGlobalTxId(), is(globalTxId)); + assertThat(command.getServiceName(), is(serviceName)); + } + + @Test + public void assertOnTccTransactionFailedEnded() { + asyncStub.onConnected(serviceConfig, commandStreamObserver); + awaitUntilConnected(); + blockingStub.onTccTransactionStarted(newTxStart()); + blockingStub.participate(newParticipatedEvent("Succeed")); + blockingStub.onTccTransactionEnded(newTxEnd("Failed")); + + await().atMost(2, SECONDS).until(() -> !receivedCommands.isEmpty()); + assertThat(receivedCommands.size(), is(1)); + GrpcTccCoordinateCommand command = receivedCommands.poll(); + assertThat(command.getMethod(), is("cancel")); + assertThat(command.getGlobalTxId(), is(globalTxId)); + assertThat(command.getServiceName(), is(serviceName)); + } + + private GrpcTccParticipatedEvent newParticipatedEvent(String status) { + return GrpcTccParticipatedEvent.newBuilder() + .setGlobalTxId(globalTxId) + .setLocalTxId(localTxId) + .setServiceName(serviceName) + .setInstanceId(instanceId) + .setCancelMethod(cancelMethod) + .setConfirmMethod(confirmMethod) + .setStatus(status) + .build(); + } + + private GrpcTccTransactionStartedEvent newTxStart() { + return GrpcTccTransactionStartedEvent.newBuilder() + .setGlobalTxId(globalTxId) + .setLocalTxId(localTxId) + .build(); + } + + private GrpcTccTransactionEndedEvent newTxEnd(String status) { + return GrpcTccTransactionEndedEvent.newBuilder() + .setGlobalTxId(globalTxId) + .setLocalTxId(localTxId) + .setStatus(status) + .build(); } private GrpcAck onCompensation(GrpcTccCoordinateCommand command) { diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java similarity index 85% rename from alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java rename to alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java index cc39a8c..f97c5d8 100644 --- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/TccCoordinateCommandStreamObserver.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.servicecomb.saga.alpha.tcc.server.common; +package org.apache.servicecomb.saga.alpha.tcc.server; import io.grpc.stub.StreamObserver; import java.util.Queue; @@ -24,14 +24,14 @@ import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCoordinateCommand; public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCoordinateCommand> { - private static Queue<GrpcTccCoordinateCommand> receivedCommands; - private Consumer<GrpcTccCoordinateCommand> consumer; + private Queue<GrpcTccCoordinateCommand> receivedCommands; + private Consumer<GrpcTccCoordinateCommand> consumer; private boolean completed = false; public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCoordinateCommand> consumer, Queue<GrpcTccCoordinateCommand> receivedCommands) { this.consumer = consumer; - TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands; + this.receivedCommands = receivedCommands; } @Override @@ -42,7 +42,6 @@ public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTc @Override public void onError(Throwable t) { - } @Override
