This is an automated email from the ASF dual-hosted git repository. seanyinx pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit b4ced3e24bfdf3cc23eb8a8bdaf37bde30e9587b Author: Eric Lee <[email protected]> AuthorDate: Thu Dec 28 20:53:51 2017 +0800 SCB-138 replace thrift with grpc Signed-off-by: Eric Lee <[email protected]> --- alpha/alpha-server/pom.xml | 13 ++++- .../servicecomb/saga/alpha/server/AlphaConfig.java | 20 +++++-- .../{ThriftStartable.java => GrpcStartable.java} | 42 ++++++++------ .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 54 ++++++++++++++++++ .../saga/alpha/server/ServerStartable.java | 9 ++- .../saga/alpha/server/ThriftStartable.java | 5 +- .../saga/alpha/server/AlphaIntegrationTest.java | 52 +++++++++++++----- .../omega-connector-grpc}/pom.xml | 43 ++++++++++++--- .../connector/grpc/GrpcClientMessageSender.java | 61 +++++++++++++++++++++ .../connector/grpc/GrpcTxEventEndpointImpl.java} | 23 +++++++- .../grpc/GrpcClientMessageSenderTest.java} | 64 +++++++++++++--------- .../connector/thrift/ThriftMessageSenderTest.java | 22 +++++--- omega/omega-connector/pom.xml | 1 + .../saga/omega/format/NativeMessageFormat.java | 13 ++++- omega/omega-spring-starter/pom.xml | 12 ++++ .../saga/omega/spring/OmegaSpringConfig.java | 34 +++++++++++- .../saga/omega/transaction/MessageSerializer.java | 2 + .../pack-contract-grpc}/pom.xml | 36 ++++++++---- .../pack/contract/grpc/GrpcTxEventEndpoint.java | 9 ++- .../src/main/proto/GrpcTxEvent.proto | 37 +++++++++++++ pack-contracts/pom.xml | 1 + pom.xml | 44 +++++++++++++++ 22 files changed, 494 insertions(+), 103 deletions(-) diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml index 774883d..0c30904 100644 --- a/alpha/alpha-server/pom.xml +++ b/alpha/alpha-server/pom.xml @@ -54,6 +54,18 @@ <artifactId>pack-contract-thrift</artifactId> </dependency> <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + </dependency> + <dependency> + <groupId>io.servicecomb.saga</groupId> + <artifactId>pack-contract-grpc</artifactId> + </dependency> + <dependency> <groupId>io.servicecomb.saga</groupId> <artifactId>alpha-core</artifactId> </dependency> @@ -84,7 +96,6 @@ <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> </dependency> - </dependencies> <build> diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java index d443fa7..a453c81 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java @@ -43,15 +43,27 @@ class AlphaConfig { TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo); - ThriftStartable startable = new ThriftStartable( + ServerStartable startable = buildGrpc(port, omegaCallback, eventRepository); + CompletableFuture.runAsync(startable::start); + + return eventRepository; + } + + private ServerStartable buildThrift(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) { + return new ThriftStartable( port, new SwiftTxEventEndpointImpl( new TxConsistentService( eventRepository, omegaCallback))); + } - CompletableFuture.runAsync(startable::start); - - return eventRepository; + private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) { + return new GrpcStartable( + port, + new GrpcTxEventEndpointImpl( + new TxConsistentService( + eventRepository, + omegaCallback))); } } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcStartable.java similarity index 51% copy from alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java copy to alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcStartable.java index 71acc2f..6affb62 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcStartable.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,31 +14,40 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ package io.servicecomb.saga.alpha.server; -import java.util.Collections; +import java.io.IOException; +import java.util.Arrays; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerBuilder; -import com.facebook.swift.codec.ThriftCodecManager; -import com.facebook.swift.service.ThriftServer; -import com.facebook.swift.service.ThriftServerConfig; -import com.facebook.swift.service.ThriftServiceProcessor; +class GrpcStartable implements ServerStartable { -class ThriftStartable { - private final ThriftServer server; + private final Server server; - ThriftStartable(int port, Object... services) { - server = new ThriftServer( - new ThriftServiceProcessor(new ThriftCodecManager(), - Collections.emptyList(), - services), - new ThriftServerConfig().setPort(port)); + GrpcStartable(int port, BindableService... services) { + ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port); + Arrays.stream(services).forEach(serverBuilder::addService); + server = serverBuilder.build(); } - void start() { - Runtime.getRuntime().addShutdownHook(new Thread(server::close)); + @Override + public void start() { + Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown)); - server.start(); + try { + server.start(); + server.awaitTermination(); + } catch (IOException e) { + throw new IllegalStateException("Unable to start grpc server.", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java new file mode 100644 index 0000000..278183c --- /dev/null +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package io.servicecomb.saga.alpha.server; + +import java.util.Date; + +import io.grpc.stub.StreamObserver; +import io.servicecomb.saga.alpha.core.TxConsistentService; +import io.servicecomb.saga.alpha.core.TxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcEmpty; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; + +class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { + + private final TxConsistentService txConsistentService; + + GrpcTxEventEndpointImpl(TxConsistentService txConsistentService) { + this.txConsistentService = txConsistentService; + } + + @Override + public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) { + txConsistentService.handle(new TxEvent( + new Date(message.getTimestamp()), + message.getGlobalTxId(), + message.getLocalTxId(), + message.getParentTxId(), + message.getType(), + message.getPayloads().toByteArray() + )); + GrpcEmpty reply = GrpcEmpty.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } +} diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ServerStartable.java similarity index 87% copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java copy to alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ServerStartable.java index b1eb7fd..4657cec 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ServerStartable.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,10 +14,12 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.omega.transaction; +package io.servicecomb.saga.alpha.server; -public interface MessageSerializer { - byte[] serialize(TxEvent event); +interface ServerStartable { + void start(); } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java index 71acc2f..0da6a5c 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java @@ -24,7 +24,7 @@ import com.facebook.swift.service.ThriftServer; import com.facebook.swift.service.ThriftServerConfig; import com.facebook.swift.service.ThriftServiceProcessor; -class ThriftStartable { +class ThriftStartable implements ServerStartable { private final ThriftServer server; ThriftStartable(int port, Object... services) { @@ -35,7 +35,8 @@ class ThriftStartable { new ThriftServerConfig().setPort(port)); } - void start() { + @Override + public void start() { Runtime.getRuntime().addShutdownHook(new Thread(server::close)); server.start(); diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index f7ec33c..1be42f9 100644 --- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -17,7 +17,6 @@ package io.servicecomb.saga.alpha.server; -import static com.google.common.net.HostAndPort.fromParts; import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; @@ -45,23 +44,31 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.test.context.junit4.SpringRunner; -import com.facebook.nifty.client.FramedClientConnector; -import com.facebook.swift.service.ThriftClientManager; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.servicecomb.saga.alpha.core.EventType; import io.servicecomb.saga.alpha.core.OmegaCallback; import io.servicecomb.saga.alpha.core.TxEvent; import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; @RunWith(SpringRunner.class) @SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090") public class AlphaIntegrationTest { - private static final ThriftClientManager clientManager = new ThriftClientManager(); - private static final String payload = "hello world"; + private static final int port = 8090; + + // private static final ThriftClientManager clientManager = new ThriftClientManager(); + private static ManagedChannel clientChannel = ManagedChannelBuilder + .forAddress("localhost", port).usePlaintext(true).build(); - private final int port = 8090; + private TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel); + + private static final String payload = "hello world"; private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); @@ -74,27 +81,30 @@ public class AlphaIntegrationTest { @Autowired private List<CompensationContext> compensationContexts; - private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port)); - private SwiftTxEventEndpoint endpoint; +// private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port)); +// private SwiftTxEventEndpoint endpoint; + @AfterClass public static void tearDown() throws Exception { - clientManager.close(); + clientChannel.shutdown(); +// clientManager.close(); } @Before - public void setUp() throws Exception { - endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get(); + public void before() throws Exception { +// endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get(); } @After public void after() throws Exception { - endpoint.close(); +// endpoint.close(); } @Test public void persistsEvent() throws Exception { - endpoint.handle(someEvent(TxStartedEvent)); +// endpoint.handle(someEvent(TxStartedEvent)); + stub.reportEvent(someGrpcEvent(TxStartedEvent)); TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId); @@ -117,7 +127,8 @@ public class AlphaIntegrationTest { eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes())); eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); - endpoint.handle(someEvent(TxAbortedEvent)); +// endpoint.handle(someEvent(TxAbortedEvent)); + stub.reportEvent(someGrpcEvent(TxAbortedEvent)); await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1); assertThat(compensationContexts, containsInAnyOrder( @@ -137,6 +148,17 @@ public class AlphaIntegrationTest { payload.getBytes()); } + private GrpcTxEvent someGrpcEvent(EventType type) { + return GrpcTxEvent.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setGlobalTxId(this.globalTxId) + .setLocalTxId(this.localTxId) + .setParentTxId(this.parentTxId) + .setType(type.name()) + .setPayloads(ByteString.copyFrom(payload.getBytes())) + .build(); + } + private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) { return eventEnvelopeOf(eventType, UUID.randomUUID().toString(), UUID.randomUUID().toString(), payloads); } diff --git a/omega/omega-spring-starter/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml similarity index 59% copy from omega/omega-spring-starter/pom.xml copy to omega/omega-connector/omega-connector-grpc/pom.xml index 59b81f8..a6cf1a2 100644 --- a/omega/omega-spring-starter/pom.xml +++ b/omega/omega-connector/omega-connector-grpc/pom.xml @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- + ~ ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. @@ -14,33 +15,59 @@ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. + ~ + ~ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>omega</artifactId> + <artifactId>omega-connector</artifactId> <groupId>io.servicecomb.saga</groupId> <version>0.0.3-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>omega-spring-starter</artifactId> + <artifactId>omega-connector-grpc</artifactId> <dependencies> <dependency> - <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-spring-tx</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-connector-thrift</artifactId> + <artifactId>omega-transaction</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-format</artifactId> + <artifactId>pack-contract-grpc</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> - </dependencies> -</project> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>com.github.seanyinx</groupId> + <artifactId>unit-scaffolding</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java new file mode 100644 index 0000000..25f6223 --- /dev/null +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package io.servicecomb.saga.omega.connector.grpc; + +import com.google.protobuf.ByteString; + +import io.servicecomb.saga.omega.transaction.MessageSender; +import io.servicecomb.saga.omega.transaction.MessageSerializer; +import io.servicecomb.saga.omega.transaction.TxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; + +public class GrpcClientMessageSender implements MessageSender { + + private final GrpcTxEventEndpoint eventService; + + private final MessageSerializer serializer; + + public GrpcClientMessageSender(GrpcTxEventEndpoint eventService, MessageSerializer serializer) { + this.eventService = eventService; + this.serializer = serializer; + } + + @Override + public void send(TxEvent event) { + eventService.reportEvent(convertEvent(event)); + } + + private GrpcTxEvent convertEvent(TxEvent event) { + ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads())); + Builder builder = GrpcTxEvent.newBuilder() + .setTimestamp(event.timestamp()) + .setGlobalTxId(event.globalTxId()) + .setLocalTxId(event.localTxId()) + .setType(event.type()) + .setPayloads(payloads); + if (event.parentTxId() != null) { + builder.setParentTxId(event.parentTxId()); + } + return builder.build(); + } +} diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java similarity index 57% copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java copy to omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java index b1eb7fd..b3f2b26 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,10 +14,26 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.omega.transaction; +package io.servicecomb.saga.omega.connector.grpc; + +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; + +public class GrpcTxEventEndpointImpl implements GrpcTxEventEndpoint { + + private final TxEventServiceBlockingStub stub; + + public GrpcTxEventEndpointImpl(TxEventServiceBlockingStub stub) { + this.stub = stub; + } -public interface MessageSerializer { - byte[] serialize(TxEvent event); + @Override + public void reportEvent(GrpcTxEvent event) { + stub.reportEvent(event); + } } diff --git a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java similarity index 55% copy from omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java copy to omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java index 05f984c..ca4f034 100644 --- a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,9 +14,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.omega.connector.thrift; +package io.servicecomb.saga.omega.connector.grpc; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static org.hamcrest.core.Is.is; @@ -28,53 +31,60 @@ import org.junit.Test; import io.servicecomb.saga.omega.transaction.MessageSerializer; import io.servicecomb.saga.omega.transaction.TxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; - -public class ThriftMessageSenderTest { +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; +public class GrpcClientMessageSenderTest { private final String globalTxId = uniquify("global tx id"); + private final String localTxId = uniquify("local tx id"); + private final String parentTxId = uniquify("parent tx id"); + private final String payload1 = uniquify("payload1"); + private final String payload2 = uniquify("payload2"); - private SwiftTxEvent swiftTxEvent; + private GrpcTxEvent grpcTxEvent; - private final MessageSerializer serializer = (event) -> { - try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - for (Object o : event.payloads()) { - stream.write(o.toString().getBytes()); - } - return stream.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); + private final MessageSerializer serializer = new MessageSerializer() { + @Override + public byte[] serialize(TxEvent event) { + return serialize(event.payloads()); } - }; - private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint() { @Override - public void handle(SwiftTxEvent message) { - swiftTxEvent = message; + public byte[] serialize(Object[] objects) { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + for (Object o : objects) { + stream.write(o.toString().getBytes()); + } + return stream.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException(e); + } } + }; + + private final GrpcTxEventEndpoint eventService = new GrpcTxEventEndpoint() { @Override - public void close() throws Exception { + public void reportEvent(GrpcTxEvent event) { + grpcTxEvent = event; } }; - private final ThriftMessageSender messageSender = new ThriftMessageSender(eventService, serializer); + private final GrpcClientMessageSender messageSender = new GrpcClientMessageSender(eventService, serializer); @Test public void sendSerializedEvent() throws Exception { - TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, getClass().getCanonicalName(), payload1, payload2); + TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, payload1, payload2); messageSender.send(event); - assertThat(swiftTxEvent.globalTxId(), is(event.globalTxId())); - assertThat(swiftTxEvent.localTxId(), is(event.localTxId())); - assertThat(swiftTxEvent.parentTxId(), is(event.parentTxId())); - assertThat(swiftTxEvent.compensationMethod(), is(event.compensationMethod())); - assertThat(swiftTxEvent.payloads(), is(serializer.serialize(event))); + assertThat(grpcTxEvent.getGlobalTxId(), is(event.globalTxId())); + assertThat(grpcTxEvent.getLocalTxId(), is(event.localTxId())); + assertThat(grpcTxEvent.getParentTxId(), is(event.parentTxId())); + assertThat(grpcTxEvent.getPayloads().toByteArray(), is(serializer.serialize(event))); } -} +} \ No newline at end of file diff --git a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java index 05f984c..7b5288f 100644 --- a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java @@ -41,14 +41,22 @@ public class ThriftMessageSenderTest { private SwiftTxEvent swiftTxEvent; - private final MessageSerializer serializer = (event) -> { - try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - for (Object o : event.payloads()) { - stream.write(o.toString().getBytes()); + private final MessageSerializer serializer = new MessageSerializer() { + @Override + public byte[] serialize(TxEvent event) { + return serialize(event.payloads()); + } + + @Override + public byte[] serialize(Object[] objects) { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + for (Object o : objects) { + stream.write(o.toString().getBytes()); + } + return stream.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException(e); } - return stream.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); } }; diff --git a/omega/omega-connector/pom.xml b/omega/omega-connector/pom.xml index 21203b3..63ceda6 100644 --- a/omega/omega-connector/pom.xml +++ b/omega/omega-connector/pom.xml @@ -30,6 +30,7 @@ <packaging>pom</packaging> <modules> <module>omega-connector-thrift</module> + <module>omega-connector-grpc</module> </modules> diff --git a/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java b/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java index 14b1d29..60b4a74 100644 --- a/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java +++ b/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java @@ -32,13 +32,22 @@ public class NativeMessageFormat implements MessageSerializer, MessageDeserializ @Override public byte[] serialize(TxEvent event) { try { + return serialize(event.payloads()); + } catch (OmegaException e) { + throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e); + } + } + + @Override + public byte[] serialize(Object[] objects) { + try { ByteArrayOutputStream out = new ByteArrayOutputStream(); try (ObjectOutputStream outputStream = new ObjectOutputStream(out)) { - outputStream.writeObject(event.payloads()); + outputStream.writeObject(objects); return out.toByteArray(); } } catch (IOException e) { - throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e); + throw new OmegaException("Unable to serialize object", e); } } diff --git a/omega/omega-spring-starter/pom.xml b/omega/omega-spring-starter/pom.xml index 59b81f8..40767ba 100644 --- a/omega/omega-spring-starter/pom.xml +++ b/omega/omega-spring-starter/pom.xml @@ -41,6 +41,18 @@ <groupId>io.servicecomb.saga</groupId> <artifactId>omega-format</artifactId> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + </dependency> + <dependency> + <groupId>io.servicecomb.saga</groupId> + <artifactId>omega-connector-grpc</artifactId> + </dependency> </dependencies> </project> diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 5f62884..3f3460c 100644 --- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -36,6 +36,10 @@ import org.springframework.context.annotation.Configuration; import com.facebook.nifty.client.FramedClientConnector; import com.facebook.swift.service.ThriftClientManager; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender; +import io.servicecomb.saga.omega.connector.grpc.GrpcTxEventEndpointImpl; import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender; import io.servicecomb.saga.omega.context.IdGenerator; import io.servicecomb.saga.omega.context.OmegaContext; @@ -43,6 +47,8 @@ import io.servicecomb.saga.omega.context.UniqueIdGenerator; import io.servicecomb.saga.omega.format.NativeMessageFormat; import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.MessageSerializer; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; @Configuration @@ -51,6 +57,8 @@ class OmegaSpringConfig { private final ThriftClientManager clientManager = new ThriftClientManager(); private final List<AutoCloseable> closeables = new ArrayList<>(); + private ManagedChannel clientChannel; + @Bean IdGenerator<String> idGenerator() { return new UniqueIdGenerator(); @@ -61,7 +69,7 @@ class OmegaSpringConfig { return new OmegaContext(idGenerator); } - @Bean + // @Bean MessageSender messageSender(@Value("${alpha.cluster.address}") String[] addresses) { // TODO: 2017/12/26 connect to the one with lowest latency for (String address : addresses) { @@ -105,5 +113,29 @@ class OmegaSpringConfig { } clientManager.close(); + clientChannel.shutdown(); + } + + @Bean + MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses) { + // TODO: 2017/12/26 connect to the one with lowest latency + for (String address : addresses) { + try { + String[] pair = address.split(":"); + return createMessageSender(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat()); + } catch (Exception e) { + log.error("Unable to connect to alpha at {}", address, e); + } + } + + throw new IllegalArgumentException( + "None of the alpha cluster is reachable: " + Arrays.toString(addresses)); + } + + private GrpcClientMessageSender createMessageSender(String host, int port, MessageSerializer serializer) { + clientChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build(); + TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel); + GrpcTxEventEndpointImpl eventService = new GrpcTxEventEndpointImpl(stub); + return new GrpcClientMessageSender(eventService, serializer); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java index b1eb7fd..2148eb6 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java @@ -19,4 +19,6 @@ package io.servicecomb.saga.omega.transaction; public interface MessageSerializer { byte[] serialize(TxEvent event); + + byte[] serialize(Object[] objects); } diff --git a/omega/omega-spring-starter/pom.xml b/pack-contracts/pack-contract-grpc/pom.xml similarity index 66% copy from omega/omega-spring-starter/pom.xml copy to pack-contracts/pack-contract-grpc/pom.xml index 59b81f8..388c5aa 100644 --- a/omega/omega-spring-starter/pom.xml +++ b/pack-contracts/pack-contract-grpc/pom.xml @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- + ~ ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. @@ -14,33 +15,46 @@ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. + ~ + ~ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> - <artifactId>omega</artifactId> + <artifactId>pack-contracts</artifactId> <groupId>io.servicecomb.saga</groupId> <version>0.0.3-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>omega-spring-starter</artifactId> + <artifactId>pack-contract-grpc</artifactId> <dependencies> <dependency> - <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-spring-tx</artifactId> - </dependency> - <dependency> - <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-connector-thrift</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> - <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-format</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> </dependency> </dependencies> -</project> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.5.0.Final</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java similarity index 85% copy from omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java copy to pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java index b1eb7fd..32a3b6b 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java +++ b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,10 +14,12 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.omega.transaction; +package io.servicecomb.saga.pack.contract.grpc; -public interface MessageSerializer { - byte[] serialize(TxEvent event); +public interface GrpcTxEventEndpoint { + void reportEvent(GrpcTxEvent message); } diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto new file mode 100644 index 0000000..41ffb81 --- /dev/null +++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto @@ -0,0 +1,37 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.servicecomb.saga.pack.contract.grpc"; +option java_outer_classname = "TxEventProto"; + +service TxEventService { + rpc ReportEvent (GrpcTxEvent) returns (GrpcEmpty) {} +} + +message GrpcTxEvent { + int64 timestamp = 1; + string globalTxId = 2; + string localTxId = 3; + string parentTxId = 4; + string type = 5; + bytes payloads = 6; +} + +message GrpcEmpty {} \ No newline at end of file diff --git a/pack-contracts/pom.xml b/pack-contracts/pom.xml index cc035dd..b6ad03c 100644 --- a/pack-contracts/pom.xml +++ b/pack-contracts/pom.xml @@ -30,6 +30,7 @@ <packaging>pom</packaging> <modules> <module>pack-contract-thrift</module> + <module>pack-contract-grpc</module> </modules> </project> diff --git a/pom.xml b/pom.xml index 6668b59..8fb9f63 100755 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ <akka.version>2.5.6</akka.version> <rat.version>0.12</rat.version> <maven.failsafe.version>2.19.1</maven.failsafe.version> + <grpc.version>1.8.0</grpc.version> </properties> <name>ServiceComb Saga</name> @@ -158,6 +159,11 @@ </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> + <artifactId>omega-connector-grpc</artifactId> + <version>0.0.3-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>io.servicecomb.saga</groupId> <artifactId>omega-spring-starter</artifactId> <version>0.0.3-SNAPSHOT</version> </dependency> @@ -182,6 +188,11 @@ <version>0.0.3-SNAPSHOT</version> </dependency> <dependency> + <groupId>io.servicecomb.saga</groupId> + <artifactId>pack-contract-grpc</artifactId> + <version>0.0.3-SNAPSHOT</version> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> @@ -321,6 +332,21 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> <!-- test dependencies --> <dependency> @@ -528,6 +554,24 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.0</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:3.5.0:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.8.0:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </pluginManagement> <!-- enable the rat check by default --> -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
