seanyinx closed pull request #93: [WIP]SCB-138 replace thrift with grpc URL: https://github.com/apache/incubator-servicecomb-saga/pull/93
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/alpha/alpha-server/pom.xml b/alpha/alpha-server/pom.xml index 774883d5..3af09c2a 100644 --- a/alpha/alpha-server/pom.xml +++ b/alpha/alpha-server/pom.xml @@ -42,16 +42,16 @@ <dependencies> <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> - <groupId>com.facebook.swift</groupId> - <artifactId>swift-service</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>pack-contract-thrift</artifactId> + <artifactId>pack-contract-grpc</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> @@ -84,17 +84,10 @@ <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> </dependency> - </dependencies> <build> <plugins> -<!-- - <plugin> - <groupId>com.facebook.mojo</groupId> - <artifactId>swift-maven-plugin</artifactId> - </plugin> ---> <!-- mixin plugin configurations declared in another pom, just like importing dependencies managed in another pom --> <plugin> diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java index d443fa71..eca48bce 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java @@ -17,8 +17,6 @@ package io.servicecomb.saga.alpha.server; -import java.util.concurrent.CompletableFuture; - import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -43,15 +41,18 @@ TxEventRepository springTxEventRepository(@Value("${alpha.server.port:8080}") in TxEventRepository eventRepository = new SpringTxEventRepository(eventRepo); - ThriftStartable startable = new ThriftStartable( + ServerStartable startable = buildGrpc(port, omegaCallback, eventRepository); + new Thread(startable::start).start(); + + return eventRepository; + } + + private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) { + return new GrpcStartable( port, - new SwiftTxEventEndpointImpl( + new GrpcTxEventEndpointImpl( new TxConsistentService( eventRepository, omegaCallback))); - - CompletableFuture.runAsync(startable::start); - - return eventRepository; } } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcStartable.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcStartable.java new file mode 100644 index 00000000..663de71b --- /dev/null +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcStartable.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package io.servicecomb.saga.alpha.server; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerBuilder; + +class GrpcStartable implements ServerStartable { + + private static Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final Server server; + + GrpcStartable(int port, BindableService... services) { + ServerBuilder<?> serverBuilder = ServerBuilder.forPort(port); + Arrays.stream(services).forEach(serverBuilder::addService); + server = serverBuilder.build(); + } + + @Override + public void start() { + Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown)); + + try { + server.start(); + server.awaitTermination(); + } catch (IOException e) { + throw new IllegalStateException("Unable to start grpc server.", e); + } catch (InterruptedException e) { + LOG.error("grpc server was interrupted.", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java similarity index 56% rename from alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java rename to alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index f1f8e40f..76ab3467 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/SwiftTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,40 +14,42 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ package io.servicecomb.saga.alpha.server; import java.util.Date; +import io.grpc.stub.StreamObserver; import io.servicecomb.saga.alpha.core.TxConsistentService; import io.servicecomb.saga.alpha.core.TxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.GrpcEmpty; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceImplBase; -class SwiftTxEventEndpointImpl implements SwiftTxEventEndpoint { +class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { private final TxConsistentService txConsistentService; - SwiftTxEventEndpointImpl(TxConsistentService txConsistentService) { + GrpcTxEventEndpointImpl(TxConsistentService txConsistentService) { this.txConsistentService = txConsistentService; } @Override - public void handle(SwiftTxEvent message) { + public void reportEvent(GrpcTxEvent message, StreamObserver<GrpcEmpty> responseObserver) { txConsistentService.handle(new TxEvent( - new Date(message.timestamp()), - message.globalTxId(), - message.localTxId(), - message.parentTxId(), - message.type(), - message.compensationMethod(), - message.payloads() + new Date(message.getTimestamp()), + message.getGlobalTxId(), + message.getLocalTxId(), + message.getParentTxId().isEmpty()? null : message.getParentTxId(), + message.getType(), + message.getCompensationMethod(), + message.getPayloads().toByteArray() )); - } - - @Override - public void close() throws Exception { - + GrpcEmpty reply = GrpcEmpty.newBuilder().build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); } } diff --git a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ServerStartable.java similarity index 72% rename from pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java rename to alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ServerStartable.java index ae1fde93..4657cecc 100644 --- a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEventEndpoint.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ServerStartable.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,16 +14,12 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.pack.contracts.thrift; - -import com.facebook.swift.service.ThriftMethod; -import com.facebook.swift.service.ThriftService; - -@ThriftService("TxEventEndpoint") -public interface SwiftTxEventEndpoint extends AutoCloseable { +package io.servicecomb.saga.alpha.server; - @ThriftMethod - void handle(SwiftTxEvent message); +interface ServerStartable { + void start(); } diff --git a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java index f7ec33cb..56a57eb0 100644 --- a/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java +++ b/alpha/alpha-server/src/test/java/io/servicecomb/saga/alpha/server/AlphaIntegrationTest.java @@ -17,7 +17,6 @@ package io.servicecomb.saga.alpha.server; -import static com.google.common.net.HostAndPort.fromParts; import static io.servicecomb.saga.alpha.core.EventType.TxAbortedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxEndedEvent; import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; @@ -34,9 +33,7 @@ import java.util.Objects; import java.util.UUID; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -45,23 +42,29 @@ import org.springframework.context.annotation.Configuration; import org.springframework.test.context.junit4.SpringRunner; -import com.facebook.nifty.client.FramedClientConnector; -import com.facebook.swift.service.ThriftClientManager; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import io.servicecomb.saga.alpha.core.EventType; import io.servicecomb.saga.alpha.core.OmegaCallback; import io.servicecomb.saga.alpha.core.TxEvent; import io.servicecomb.saga.alpha.server.AlphaIntegrationTest.OmegaCallbackConfig; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; @RunWith(SpringRunner.class) @SpringBootTest(classes = {AlphaApplication.class, OmegaCallbackConfig.class}, properties = "alpha.server.port=8090") public class AlphaIntegrationTest { - private static final ThriftClientManager clientManager = new ThriftClientManager(); - private static final String payload = "hello world"; + private static final int port = 8090; + + private static ManagedChannel clientChannel = ManagedChannelBuilder + .forAddress("localhost", port).usePlaintext(true).build(); - private final int port = 8090; + private TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel); + + private static final String payload = "hello world"; private final String globalTxId = UUID.randomUUID().toString(); private final String localTxId = UUID.randomUUID().toString(); @@ -74,27 +77,14 @@ @Autowired private List<CompensationContext> compensationContexts; - private final FramedClientConnector connector = new FramedClientConnector(fromParts("localhost", port)); - private SwiftTxEventEndpoint endpoint; - @AfterClass public static void tearDown() throws Exception { - clientManager.close(); - } - - @Before - public void setUp() throws Exception { - endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get(); - } - - @After - public void after() throws Exception { - endpoint.close(); + clientChannel.shutdown(); } @Test public void persistsEvent() throws Exception { - endpoint.handle(someEvent(TxStartedEvent)); + stub.reportEvent(someGrpcEvent(TxStartedEvent)); TxEventEnvelope envelope = eventRepo.findByEventGlobalTxId(globalTxId); @@ -117,7 +107,7 @@ public void doNotCompensateDuplicateTxOnFailure() throws Exception { eventRepo.save(eventEnvelopeOf(TxStartedEvent, localTxId1, UUID.randomUUID().toString(), "service b".getBytes())); eventRepo.save(eventEnvelopeOf(TxEndedEvent, new byte[0])); - endpoint.handle(someEvent(TxAbortedEvent)); + stub.reportEvent(someGrpcEvent(TxAbortedEvent)); await().atMost(1, SECONDS).until(() -> compensationContexts.size() > 1); assertThat(compensationContexts, containsInAnyOrder( @@ -126,15 +116,16 @@ public void doNotCompensateDuplicateTxOnFailure() throws Exception { )); } - private SwiftTxEvent someEvent(EventType type) { - return new SwiftTxEvent( - System.currentTimeMillis(), - this.globalTxId, - this.localTxId, - this.parentTxId, - type.name(), - compensationMethod, - payload.getBytes()); + private GrpcTxEvent someGrpcEvent(EventType type) { + return GrpcTxEvent.newBuilder() + .setTimestamp(System.currentTimeMillis()) + .setGlobalTxId(this.globalTxId) + .setLocalTxId(this.localTxId) + .setParentTxId(this.parentTxId) + .setType(type.name()) + .setCompensationMethod(getClass().getCanonicalName()) + .setPayloads(ByteString.copyFrom(payload.getBytes())) + .build(); } private TxEventEnvelope eventEnvelopeOf(EventType eventType, byte[] payloads) { diff --git a/omega/omega-connector/omega-connector-thrift/pom.xml b/omega/omega-connector/omega-connector-grpc/pom.xml similarity index 84% rename from omega/omega-connector/omega-connector-thrift/pom.xml rename to omega/omega-connector/omega-connector-grpc/pom.xml index ae64b4a1..a6cf1a29 100644 --- a/omega/omega-connector/omega-connector-thrift/pom.xml +++ b/omega/omega-connector/omega-connector-grpc/pom.xml @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- + ~ ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. @@ -14,6 +15,8 @@ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. + ~ + ~ --> <project xmlns="http://maven.apache.org/POM/4.0.0" @@ -26,24 +29,28 @@ </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>omega-connector-thrift</artifactId> + <artifactId>omega-connector-grpc</artifactId> <dependencies> <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> - <groupId>com.facebook.swift</groupId> - <artifactId>swift-service</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>pack-contract-thrift</artifactId> + <artifactId>omega-transaction</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-transaction</artifactId> + <artifactId>pack-contract-grpc</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> @@ -63,5 +70,4 @@ <artifactId>unit-scaffolding</artifactId> </dependency> </dependencies> - -</project> +</project> \ No newline at end of file diff --git a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java similarity index 52% rename from omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java rename to omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java index 44499b32..25f62231 100644 --- a/omega/omega-connector/omega-connector-thrift/src/main/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,40 +14,48 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.omega.connector.thrift; +package io.servicecomb.saga.omega.connector.grpc; + +import com.google.protobuf.ByteString; import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.MessageSerializer; import io.servicecomb.saga.omega.transaction.TxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; + +public class GrpcClientMessageSender implements MessageSender { + + private final GrpcTxEventEndpoint eventService; -public class ThriftMessageSender implements MessageSender, AutoCloseable { - private final SwiftTxEventEndpoint eventService; private final MessageSerializer serializer; - public ThriftMessageSender(SwiftTxEventEndpoint eventService, MessageSerializer serializer) { + public GrpcClientMessageSender(GrpcTxEventEndpoint eventService, MessageSerializer serializer) { this.eventService = eventService; this.serializer = serializer; } @Override public void send(TxEvent event) { - eventService.handle(new SwiftTxEvent( - event.timestamp(), - event.globalTxId(), - event.localTxId(), - event.parentTxId(), - event.type(), - event.compensationMethod(), - serializer.serialize(event) - )); + eventService.reportEvent(convertEvent(event)); } - @Override - public void close() throws Exception { - eventService.close(); + private GrpcTxEvent convertEvent(TxEvent event) { + ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads())); + Builder builder = GrpcTxEvent.newBuilder() + .setTimestamp(event.timestamp()) + .setGlobalTxId(event.globalTxId()) + .setLocalTxId(event.localTxId()) + .setType(event.type()) + .setPayloads(payloads); + if (event.parentTxId() != null) { + builder.setParentTxId(event.parentTxId()); + } + return builder.build(); } } diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java similarity index 53% rename from alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java rename to omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java index 71acc2f8..b3f2b260 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/ThriftStartable.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,31 +14,26 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.alpha.server; +package io.servicecomb.saga.omega.connector.grpc; -import java.util.Collections; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; -import com.facebook.swift.codec.ThriftCodecManager; -import com.facebook.swift.service.ThriftServer; -import com.facebook.swift.service.ThriftServerConfig; -import com.facebook.swift.service.ThriftServiceProcessor; +public class GrpcTxEventEndpointImpl implements GrpcTxEventEndpoint { -class ThriftStartable { - private final ThriftServer server; + private final TxEventServiceBlockingStub stub; - ThriftStartable(int port, Object... services) { - server = new ThriftServer( - new ThriftServiceProcessor(new ThriftCodecManager(), - Collections.emptyList(), - services), - new ThriftServerConfig().setPort(port)); + public GrpcTxEventEndpointImpl(TxEventServiceBlockingStub stub) { + this.stub = stub; } - void start() { - Runtime.getRuntime().addShutdownHook(new Thread(server::close)); - - server.start(); + @Override + public void reportEvent(GrpcTxEvent event) { + stub.reportEvent(event); } } diff --git a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java similarity index 55% rename from omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java rename to omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java index 05f984c5..ca4f0340 100644 --- a/omega/omega-connector/omega-connector-thrift/src/test/java/io/servicecomb/saga/omega/connector/thrift/ThriftMessageSenderTest.java +++ b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java @@ -1,4 +1,5 @@ /* + * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -13,9 +14,11 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * */ -package io.servicecomb.saga.omega.connector.thrift; +package io.servicecomb.saga.omega.connector.grpc; import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; import static org.hamcrest.core.Is.is; @@ -28,53 +31,60 @@ import io.servicecomb.saga.omega.transaction.MessageSerializer; import io.servicecomb.saga.omega.transaction.TxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEvent; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; - -public class ThriftMessageSenderTest { +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; +import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; +public class GrpcClientMessageSenderTest { private final String globalTxId = uniquify("global tx id"); + private final String localTxId = uniquify("local tx id"); + private final String parentTxId = uniquify("parent tx id"); + private final String payload1 = uniquify("payload1"); + private final String payload2 = uniquify("payload2"); - private SwiftTxEvent swiftTxEvent; + private GrpcTxEvent grpcTxEvent; - private final MessageSerializer serializer = (event) -> { - try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - for (Object o : event.payloads()) { - stream.write(o.toString().getBytes()); - } - return stream.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); + private final MessageSerializer serializer = new MessageSerializer() { + @Override + public byte[] serialize(TxEvent event) { + return serialize(event.payloads()); } - }; - private final SwiftTxEventEndpoint eventService = new SwiftTxEventEndpoint() { @Override - public void handle(SwiftTxEvent message) { - swiftTxEvent = message; + public byte[] serialize(Object[] objects) { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + for (Object o : objects) { + stream.write(o.toString().getBytes()); + } + return stream.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException(e); + } } + }; + + private final GrpcTxEventEndpoint eventService = new GrpcTxEventEndpoint() { @Override - public void close() throws Exception { + public void reportEvent(GrpcTxEvent event) { + grpcTxEvent = event; } }; - private final ThriftMessageSender messageSender = new ThriftMessageSender(eventService, serializer); + private final GrpcClientMessageSender messageSender = new GrpcClientMessageSender(eventService, serializer); @Test public void sendSerializedEvent() throws Exception { - TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, getClass().getCanonicalName(), payload1, payload2); + TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, payload1, payload2); messageSender.send(event); - assertThat(swiftTxEvent.globalTxId(), is(event.globalTxId())); - assertThat(swiftTxEvent.localTxId(), is(event.localTxId())); - assertThat(swiftTxEvent.parentTxId(), is(event.parentTxId())); - assertThat(swiftTxEvent.compensationMethod(), is(event.compensationMethod())); - assertThat(swiftTxEvent.payloads(), is(serializer.serialize(event))); + assertThat(grpcTxEvent.getGlobalTxId(), is(event.globalTxId())); + assertThat(grpcTxEvent.getLocalTxId(), is(event.localTxId())); + assertThat(grpcTxEvent.getParentTxId(), is(event.parentTxId())); + assertThat(grpcTxEvent.getPayloads().toByteArray(), is(serializer.serialize(event))); } -} +} \ No newline at end of file diff --git a/omega/omega-connector/pom.xml b/omega/omega-connector/pom.xml index 21203b36..9d0c4e70 100644 --- a/omega/omega-connector/pom.xml +++ b/omega/omega-connector/pom.xml @@ -29,7 +29,7 @@ <artifactId>omega-connector</artifactId> <packaging>pom</packaging> <modules> - <module>omega-connector-thrift</module> + <module>omega-connector-grpc</module> </modules> diff --git a/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java b/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java index 14b1d29a..60b4a74b 100644 --- a/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java +++ b/omega/omega-format/src/main/java/io/servicecomb/saga/omega/format/NativeMessageFormat.java @@ -31,14 +31,23 @@ public class NativeMessageFormat implements MessageSerializer, MessageDeserializer { @Override public byte[] serialize(TxEvent event) { + try { + return serialize(event.payloads()); + } catch (OmegaException e) { + throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e); + } + } + + @Override + public byte[] serialize(Object[] objects) { try { ByteArrayOutputStream out = new ByteArrayOutputStream(); try (ObjectOutputStream outputStream = new ObjectOutputStream(out)) { - outputStream.writeObject(event.payloads()); + outputStream.writeObject(objects); return out.toByteArray(); } } catch (IOException e) { - throw new OmegaException("Unable to serialize event with global tx id " + event.globalTxId(), e); + throw new OmegaException("Unable to serialize object", e); } } diff --git a/omega/omega-spring-starter/pom.xml b/omega/omega-spring-starter/pom.xml index 59b81f88..b0f28540 100644 --- a/omega/omega-spring-starter/pom.xml +++ b/omega/omega-spring-starter/pom.xml @@ -35,11 +35,19 @@ </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-connector-thrift</artifactId> + <artifactId>omega-format</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-format</artifactId> + <artifactId>omega-connector-grpc</artifactId> </dependency> </dependencies> diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 5f628844..9b749fd5 100644 --- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -17,13 +17,8 @@ package io.servicecomb.saga.omega.spring; -import static com.google.common.net.HostAndPort.fromParts; - import java.lang.invoke.MethodHandles; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.ExecutionException; import javax.annotation.PreDestroy; @@ -33,23 +28,24 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import com.facebook.nifty.client.FramedClientConnector; -import com.facebook.swift.service.ThriftClientManager; - -import io.servicecomb.saga.omega.connector.thrift.ThriftMessageSender; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender; +import io.servicecomb.saga.omega.connector.grpc.GrpcTxEventEndpointImpl; import io.servicecomb.saga.omega.context.IdGenerator; import io.servicecomb.saga.omega.context.OmegaContext; import io.servicecomb.saga.omega.context.UniqueIdGenerator; import io.servicecomb.saga.omega.format.NativeMessageFormat; import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.MessageSerializer; -import io.servicecomb.saga.pack.contracts.thrift.SwiftTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; @Configuration class OmegaSpringConfig { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ThriftClientManager clientManager = new ThriftClientManager(); - private final List<AutoCloseable> closeables = new ArrayList<>(); + + private ManagedChannel clientChannel; @Bean IdGenerator<String> idGenerator() { @@ -61,15 +57,18 @@ OmegaContext omegaContext(IdGenerator<String> idGenerator) { return new OmegaContext(idGenerator); } + @PreDestroy + void close() { + clientChannel.shutdown(); + } + @Bean - MessageSender messageSender(@Value("${alpha.cluster.address}") String[] addresses) { + MessageSender grpcMessageSender(@Value("${alpha.cluster.address}") String[] addresses) { // TODO: 2017/12/26 connect to the one with lowest latency for (String address : addresses) { try { String[] pair = address.split(":"); - ThriftMessageSender sender = createMessageSender(clientManager, pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat()); - closeables.add(sender); - return sender; + return createMessageSender(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat()); } catch (Exception e) { log.error("Unable to connect to alpha at {}", address, e); } @@ -79,31 +78,10 @@ MessageSender messageSender(@Value("${alpha.cluster.address}") String[] addresse "None of the alpha cluster is reachable: " + Arrays.toString(addresses)); } - private ThriftMessageSender createMessageSender(ThriftClientManager clientManager, - String host, - int port, - MessageSerializer serializer) { - - FramedClientConnector connector = new FramedClientConnector(fromParts(host, port)); - - try { - SwiftTxEventEndpoint endpoint = clientManager.createClient(connector, SwiftTxEventEndpoint.class).get(); - return new ThriftMessageSender(endpoint, serializer); - } catch (InterruptedException | ExecutionException e) { - throw new IllegalStateException("Failed to create transaction event endpoint client to " + host + ":" + port, e); - } - } - - @PreDestroy - void close() { - for (AutoCloseable closeable : closeables) { - try { - closeable.close(); - } catch (Exception e) { - log.warn("Failed to close message sender", e); - } - } - - clientManager.close(); + private GrpcClientMessageSender createMessageSender(String host, int port, MessageSerializer serializer) { + clientChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build(); + TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel); + GrpcTxEventEndpointImpl eventService = new GrpcTxEventEndpointImpl(stub); + return new GrpcClientMessageSender(eventService, serializer); } } diff --git a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java index b1eb7fd2..2148eb6e 100644 --- a/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java +++ b/omega/omega-transaction/src/main/java/io/servicecomb/saga/omega/transaction/MessageSerializer.java @@ -19,4 +19,6 @@ public interface MessageSerializer { byte[] serialize(TxEvent event); + + byte[] serialize(Object[] objects); } diff --git a/pack-contracts/pack-contract-thrift/pom.xml b/pack-contracts/pack-contract-grpc/pom.xml similarity index 69% rename from pack-contracts/pack-contract-thrift/pom.xml rename to pack-contracts/pack-contract-grpc/pom.xml index 051aaf17..388c5aa7 100644 --- a/pack-contracts/pack-contract-thrift/pom.xml +++ b/pack-contracts/pack-contract-grpc/pom.xml @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- + ~ ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. @@ -14,6 +15,8 @@ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. + ~ + ~ --> <project xmlns="http://maven.apache.org/POM/4.0.0" @@ -26,17 +29,32 @@ </parent> <modelVersion>4.0.0</modelVersion> - <artifactId>pack-contract-thrift</artifactId> + <artifactId>pack-contract-grpc</artifactId> <dependencies> <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> </dependency> <dependency> - <groupId>com.facebook.swift</groupId> - <artifactId>swift-service</artifactId> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> </dependency> </dependencies> -</project> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.5.0.Final</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file diff --git a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java new file mode 100644 index 00000000..32a3b6bf --- /dev/null +++ b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ + +package io.servicecomb.saga.pack.contract.grpc; + +public interface GrpcTxEventEndpoint { + void reportEvent(GrpcTxEvent message); +} diff --git a/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto new file mode 100644 index 00000000..f6ebf741 --- /dev/null +++ b/pack-contracts/pack-contract-grpc/src/main/proto/GrpcTxEvent.proto @@ -0,0 +1,38 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.servicecomb.saga.pack.contract.grpc"; +option java_outer_classname = "TxEventProto"; + +service TxEventService { + rpc ReportEvent (GrpcTxEvent) returns (GrpcEmpty) {} +} + +message GrpcTxEvent { + int64 timestamp = 1; + string globalTxId = 2; + string localTxId = 3; + string parentTxId = 4; + string type = 5; + string compensationMethod = 6; + bytes payloads = 7; +} + +message GrpcEmpty {} diff --git a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEvent.java b/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEvent.java deleted file mode 100644 index 93951423..00000000 --- a/pack-contracts/pack-contract-thrift/src/main/java/io/servicecomb/saga/pack/contracts/thrift/SwiftTxEvent.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.servicecomb.saga.pack.contracts.thrift; - -import com.facebook.swift.codec.ThriftConstructor; -import com.facebook.swift.codec.ThriftField; -import com.facebook.swift.codec.ThriftStruct; - -@ThriftStruct("TxEvent") -public class SwiftTxEvent { - private final long timestamp; - private final String globalTxId; - private final String localTxId; - private final String parentTxId; - private final String type; - private final byte[] payloads; - private final String compensationMethod; - - @ThriftConstructor - public SwiftTxEvent(long timestamp, - String globalTxId, - String localTxId, - String parentTxId, - String type, - String compensationMethod, - byte[] payloads) { - this.timestamp = timestamp; - this.globalTxId = globalTxId; - this.localTxId = localTxId; - this.parentTxId = parentTxId; - this.type = type; - this.payloads = payloads; - this.compensationMethod = compensationMethod; - } - - @ThriftField(1) - public long timestamp() { - return timestamp; - } - - @ThriftField(2) - public String globalTxId() { - return globalTxId; - } - - @ThriftField(3) - public String localTxId() { - return localTxId; - } - - @ThriftField(4) - public String parentTxId() { - return parentTxId; - } - - @ThriftField(5) - public String type() { - return type; - } - - @ThriftField(6) - public String compensationMethod() { - return compensationMethod; - } - - @ThriftField(7) - public byte[] payloads() { - return payloads; - } -} diff --git a/pack-contracts/pom.xml b/pack-contracts/pom.xml index cc035dd9..dc2acd86 100644 --- a/pack-contracts/pom.xml +++ b/pack-contracts/pom.xml @@ -29,7 +29,7 @@ <artifactId>pack-contracts</artifactId> <packaging>pom</packaging> <modules> - <module>pack-contract-thrift</module> + <module>pack-contract-grpc</module> </modules> </project> diff --git a/pom.xml b/pom.xml index 6668b59f..45cb12c3 100755 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ <akka.version>2.5.6</akka.version> <rat.version>0.12</rat.version> <maven.failsafe.version>2.19.1</maven.failsafe.version> + <grpc.version>1.8.0</grpc.version> </properties> <name>ServiceComb Saga</name> @@ -153,7 +154,7 @@ </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>omega-connector-thrift</artifactId> + <artifactId>omega-connector-grpc</artifactId> <version>0.0.3-SNAPSHOT</version> </dependency> <dependency> @@ -178,7 +179,7 @@ </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> - <artifactId>pack-contract-thrift</artifactId> + <artifactId>pack-contract-grpc</artifactId> <version>0.0.3-SNAPSHOT</version> </dependency> <dependency> @@ -302,24 +303,19 @@ <version>${akka.version}</version> </dependency> <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - <version>0.10.0</version> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + <version>${grpc.version}</version> </dependency> <dependency> - <groupId>com.facebook.swift</groupId> - <artifactId>swift-service</artifactId> - <version>0.23.1</version> - <exclusions> - <exclusion> - <groupId>javax.validation</groupId> - <artifactId>validation-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.bval</groupId> - <artifactId>bval-jsr303</artifactId> - </exclusion> - </exclusions> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> </dependency> <!-- test dependencies --> @@ -517,13 +513,19 @@ <version>2.8.2</version> </plugin> <plugin> - <groupId>com.facebook.mojo</groupId> - <artifactId>swift-maven-plugin</artifactId> - <version>0.23.1</version> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.0</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:3.5.0:exe:${os.detected.classifier}</protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> <executions> <execution> <goals> - <goal>generate</goal> + <goal>compile</goal> + <goal>compile-custom</goal> </goals> </execution> </executions> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services