This is an automated email from the ASF dual-hosted git repository. seanyinx pushed a commit to branch SCB-149_service_aware_callback in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 6cbcf093ba3e0b05391f6a682420d03ee78ae6b2 Author: seanyinx <[email protected]> AuthorDate: Fri Dec 29 19:25:18 2017 +0800 SCB-149 added service name and instance id to contract Signed-off-by: seanyinx <[email protected]> --- .../io/servicecomb/saga/alpha/core/TxEvent.java | 17 ++++++- .../saga/alpha/core/TxConsistentServiceTest.java | 7 ++- alpha/alpha-server/pom.xml | 4 ++ .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 2 + .../saga/alpha/server/TxEventEnvelope.java | 15 +++++- .../alpha/server/TxEventEnvelopeRepository.java | 2 +- .../src/main/resources/schema-mysql.sql | 2 + .../saga/alpha/server/AlphaIntegrationTest.java | 12 ++++- .../saga/integration/pack/tests/PackIT.java | 15 +++++- .../integration/pack/tests/TxEventEnvelope.java | 10 ++++ .../connector/grpc/GrpcClientMessageSender.java | 7 ++- .../saga/omega/context/ServiceConfig.java | 54 ++++++++-------------- .../saga/omega/spring/OmegaSpringConfig.java | 10 +++- .../src/main/proto/GrpcTxEvent.proto | 2 + 14 files changed, 114 insertions(+), 45 deletions(-) diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java index da46db8..6781cb5 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxEvent.java @@ -20,6 +20,8 @@ package io.servicecomb.saga.alpha.core; import java.util.Date; public class TxEvent { + private String serviceName; + private String instanceId; private Date creationTime; private String globalTxId; private String localTxId; @@ -31,13 +33,18 @@ public class TxEvent { private TxEvent() { } - public TxEvent(Date creationTime, + public TxEvent( + String serviceName, + String instanceId, + Date creationTime, String globalTxId, String localTxId, String parentTxId, String type, String compensationMethod, byte[] payloads) { + this.serviceName = serviceName; + this.instanceId = instanceId; this.creationTime = creationTime; this.globalTxId = globalTxId; this.localTxId = localTxId; @@ -47,6 +54,14 @@ public class TxEvent { this.payloads = payloads; } + public String serviceName() { + return serviceName; + } + + public String instanceId() { + return instanceId; + } + public Date creationTime() { return creationTime; } diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java index 13a2674..9d9332d 100644 --- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxConsistentServiceTest.java @@ -17,6 +17,7 @@ package io.servicecomb.saga.alpha.core; +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static io.servicecomb.saga.alpha.core.EventType.SagaEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.SagaStartedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; @@ -59,6 +60,8 @@ public class TxConsistentServiceTest { private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); + private final String serviceName = uniquify("serviceName"); + private final String instanceId = uniquify("instanceId"); private final String compensationMethod = getClass().getCanonicalName(); private final List<CompensationContext> compensationContexts = new ArrayList<>(); @@ -107,11 +110,11 @@ public class TxConsistentServiceTest { } private TxEvent newEvent(EventType eventType) { - return new TxEvent(new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes()); + return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, eventType.name(), compensationMethod, "yeah".getBytes()); } private TxEvent eventOf(EventType eventType, byte[] payloads, String localTxId) { - return new TxEvent(new Date(), + return new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, UUID.randomUUID().toString(), diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml index 3af09c2..32647e6 100644 --- a/alpha/alpha-server/pom.xml +++ b/alpha/alpha-server/pom.xml @@ -84,6 +84,10 @@ <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> </dependency> + <dependency> + <groupId>com.github.seanyinx</groupId> + <artifactId>unit-scaffolding</artifactId> + </dependency> </dependencies> <build> diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index 42c597e..3c23a79 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -40,6 +40,8 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { @Override public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) { txConsistentService.handle(new TxEvent( + message.getServiceName(), + message.getInstanceId(), new Date(message.getTimestamp()), message.getGlobalTxId(), message.getLocalTxId(), diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java index fa282b4..06a44dc 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelope.java @@ -42,13 +42,24 @@ class TxEventEnvelope { this.event = event; } - public TxEventEnvelope(String globalTxId, + public TxEventEnvelope( + String serviceName, + String instanceId, + String globalTxId, String localTxId, String parentTxId, String type, String compensationMethod, byte[] payloads) { - this.event = new TxEvent(new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + this.event = new TxEvent(serviceName, instanceId, new Date(), globalTxId, localTxId, parentTxId, type, compensationMethod, payloads); + } + + String serviceName() { + return event.serviceName(); + } + + String instanceId() { + return event.instanceId(); } public long creationTime() { diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java index 5f929a1..3c35cba 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/TxEventEnvelopeRepository.java @@ -26,7 +26,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEventEnvelope, Long TxEventEnvelope findByEventGlobalTxId(String globalTxId); @Query("SELECT DISTINCT new io.servicecomb.saga.alpha.server.TxEventEnvelope(" - + "t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads" + + "t.event.serviceName, t.event.instanceId, t.event.globalTxId, t.event.localTxId, t.event.parentTxId, t.event.type, t.event.compensationMethod, t.event.payloads" + ") FROM TxEventEnvelope t " + "WHERE t.event.globalTxId = ?1 AND t.event.type = ?2") List<TxEventEnvelope> findByEventGlobalTxIdAndEventType(String globalTxId, String type); diff --git a/alpha/alpha-server/src/main/resources/schema-mysql.sql b/alpha/alpha-server/src/main/resources/schema-mysql.sql index 2940804..bd98c2a 100644 --- a/alpha/alpha-server/src/main/resources/schema-mysql.sql +++ b/alpha/alpha-server/src/main/resources/schema-mysql.sql @@ -1,5 +1,7 @@ CREATE TABLE IF NOT EXISTS `tx_event_envelope` ( `surrogate_id` bigint NOT NULL AUTO_INCREMENT, + `service_name` varchar(16) NOT NULL, + `instance_id` varchar(36) NOT NULL, `creation_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), `global_tx_id` varchar(36) NOT NULL, `local_tx_id` varchar(36) NOT NULL, diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index 56a57eb..271977e 100644 --- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -17,6 +17,7 @@ package io.servicecomb.saga.alpha.server; +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; @@ -70,6 +71,8 @@ public class AlphaIntegrationTest { private final String localTxId = UUID.randomUUID().toString(); private final String parentTxId = UUID.randomUUID().toString(); private final String compensationMethod = getClass().getCanonicalName(); + private final String serviceName = uniquify("serviceName"); + private final String instanceId = uniquify("instanceId"); @Autowired private TxEventEnvelopeRepository eventRepo; @@ -88,6 +91,8 @@ public class AlphaIntegrationTest { TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId); + assertThat(envelope.serviceName(), is(serviceName)); + assertThat(envelope.instanceId(), is(instanceId)); assertThat(envelope.globalTxId(), is(globalTxId)); assertThat(envelope.localTxId(), is(localTxId)); assertThat(envelope.parentTxId(), is(parentTxId)); @@ -118,6 +123,8 @@ public class AlphaIntegrationTest { private GrpcTxEvent someGrpcEvent(EventType type) { return GrpcTxEvent.newBuilder() + .setServiceName(serviceName) + .setInstanceId(instanceId) .setTimestamp(System.currentTimeMillis()) .setGlobalTxId(this.globalTxId) .setLocalTxId(this.localTxId) @@ -133,7 +140,10 @@ public class AlphaIntegrationTest { } private TxEventEnvelope eventEnvelopeOf(EventType eventType, String localTxId, String parentTxId, byte[] payloads) { - return new TxEventEnvelope(new TxEvent(new Date(), + return new TxEventEnvelope(new TxEvent( + serviceName, + instanceId, + new Date(), globalTxId, localTxId, parentTxId, diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java index 334e60c..fb330c8 100644 --- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java +++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/PackIT.java @@ -18,6 +18,7 @@ package io.servicecomb.saga.integration.pack.tests; import static io.servicecomb.saga.omega.context.OmegaContext.GLOBAL_TX_ID_KEY; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; @@ -42,8 +43,9 @@ import io.servicecomb.saga.omega.context.OmegaContext; @RunWith(SpringRunner.class) @SpringBootTest(classes = GreetingApplication.class, webEnvironment = WebEnvironment.DEFINED_PORT, - properties = {"server.port=8080"}) + properties = {"server.port=8080", "spring.application.name=greeting-service"}) public class PackIT { + private static final String serviceName = "greeting-service"; private final String globalTxId = UUID.randomUUID().toString(); @Autowired @@ -75,12 +77,23 @@ public class PackIT { assertThat(envelopes.size(), is(4)); assertThat(envelopes.get(0).type(), is("TxStartedEvent")); assertThat(envelopes.get(0).parentTxId(), is(nullValue())); + assertThat(envelopes.get(0).serviceName(), is(serviceName)); + assertThat(envelopes.get(0).instanceId(), is(notNullValue())); + assertThat(envelopes.get(1).type(), is("TxEndedEvent")); assertThat(envelopes.get(1).parentTxId(), is(nullValue())); + assertThat(envelopes.get(1).serviceName(), is(serviceName)); + assertThat(envelopes.get(1).instanceId(), is(notNullValue())); + assertThat(envelopes.get(2).type(), is("TxStartedEvent")); assertThat(envelopes.get(2).parentTxId(), is(envelopes.get(0).localTxId())); + assertThat(envelopes.get(2).serviceName(), is(serviceName)); + assertThat(envelopes.get(2).instanceId(), is(notNullValue())); + assertThat(envelopes.get(3).type(), is("TxEndedEvent")); assertThat(envelopes.get(3).parentTxId(), is(envelopes.get(0).localTxId())); + assertThat(envelopes.get(3).serviceName(), is(serviceName)); + assertThat(envelopes.get(3).instanceId(), is(notNullValue())); } } diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java index ae7a302..206088d 100644 --- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java +++ b/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java @@ -29,6 +29,8 @@ class TxEventEnvelope { @GeneratedValue private long surrogateId; + private String serviceName; + private String instanceId; private Date creationTime; private String globalTxId; private String localTxId; @@ -39,6 +41,14 @@ class TxEventEnvelope { private TxEventEnvelope() { } + String serviceName() { + return serviceName; + } + + String instanceId() { + return instanceId; + } + String localTxId() { return localTxId; } diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java index 16f94b3..2c56247 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java @@ -23,6 +23,7 @@ package io.servicecomb.saga.omega.connector.grpc; import com.google.protobuf.ByteString; import io.grpc.ManagedChannel; +import io.servicecomb.saga.omega.context.ServiceConfig; import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.MessageSerializer; import io.servicecomb.saga.omega.transaction.TxEvent; @@ -36,10 +37,12 @@ public class GrpcClientMessageSender implements MessageSender { private final TxEventServiceBlockingStub eventService; private final MessageSerializer serializer; + private final ServiceConfig serviceConfig; - public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) { + public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer, ServiceConfig serviceConfig) { this.eventService = TxEventServiceGrpc.newBlockingStub(eventService); this.serializer = serializer; + this.serviceConfig = serviceConfig; } @Override @@ -51,6 +54,8 @@ public class GrpcClientMessageSender implements MessageSender { ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads())); Builder builder = GrpcTxEvent.newBuilder() + .setServiceName(serviceConfig.serviceName()) + .setInstanceId(serviceConfig.instanceId()) .setTimestamp(event.timestamp()) .setGlobalTxId(event.globalTxId()) .setLocalTxId(event.localTxId()) diff --git a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java similarity index 56% copy from integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java copy to omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java index ae7a302..53671ad 100644 --- a/integration-tests/pack-tests/src/test/java/io/servicecomb/saga/integration/pack/tests/TxEventEnvelope.java +++ b/omega/omega-context/src/main/java/io/servicecomb/saga/omega/context/ServiceConfig.java @@ -15,43 +15,29 @@ * limitations under the License. */ -package io.servicecomb.saga.integration.pack.tests; - -import java.util.Date; - -import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.Id; - -@Entity -class TxEventEnvelope { - @Id - @GeneratedValue - private long surrogateId; - - private Date creationTime; - private String globalTxId; - private String localTxId; - private String parentTxId; - private String type; - private byte[] payloads; - - private TxEventEnvelope() { - } - - String localTxId() { - return localTxId; - } - - String parentTxId() { - return parentTxId; +package io.servicecomb.saga.omega.context; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class ServiceConfig { + private final String serviceName; + private final String instanceId; + + public ServiceConfig(String serviceName) { + this.serviceName = serviceName; + try { + instanceId = serviceName + "-" + InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + throw new IllegalStateException(e); + } } - String type() { - return type; + public String serviceName() { + return serviceName; } - public byte[] payloads() { - return payloads; + public String instanceId() { + return instanceId; } } diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 73e2212..28181f4 100644 --- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -35,6 +35,7 @@ import io.grpc.ManagedChannelBuilder; import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender; import io.servicecomb.saga.omega.context.IdGenerator; import io.servicecomb.saga.omega.context.OmegaContext; +import io.servicecomb.saga.omega.context.ServiceConfig; import io.servicecomb.saga.omega.context.UniqueIdGenerator; import io.servicecomb.saga.omega.format.NativeMessageFormat; import io.servicecomb.saga.omega.transaction.MessageSender; @@ -55,17 +56,22 @@ class OmegaSpringConfig { return new OmegaContext(idGenerator); } + @Bean + ServiceConfig serviceConfig(@Value("${spring.application.name}") String serviceName) { + return new ServiceConfig(serviceName); + } + @PreDestroy void close() { channels.forEach(ManagedChannel::shutdown); } @Bean - MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses) { + MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses, ServiceConfig serviceConfig) { // TODO: 2017/12/26 connect to the one with lowest latency for (String address : addresses) { try { - return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat()); + return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat(), serviceConfig); } catch (Exception e) { log.error("Unable to connect to alpha at {}", address, e); } diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto index f6ebf74..743df4d 100644 --- a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto +++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto @@ -33,6 +33,8 @@ message GrpcTxEvent { string type = 5; string compensationMethod = 6; bytes payloads = 7; + string serviceName = 8; + string instanceId = 9; } message GrpcEmpty {} -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
