This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 5328f26bb41b8799b76bffb7655fa758107fb294 Author: seanyinx <sean....@huawei.com> AuthorDate: Fri Dec 29 17:47:13 2017 +0800 SCB-149 removed unnecessary endpoint interface Signed-off-by: seanyinx <sean....@huawei.com> --- .../saga/alpha/server/GrpcTxEventEndpointImpl.java | 1 + integration-tests/coverage-aggregate/pom.xml | 4 + .../connector/grpc/GrpcClientMessageSender.java | 12 ++- .../connector/grpc/GrpcTxEventEndpointImpl.java | 39 ---------- .../grpc/GrpcClientMessageSenderTest.java | 90 ---------------------- .../saga/omega/spring/OmegaSpringConfig.java | 27 +++---- .../pack/contract/grpc/GrpcTxEventEndpoint.java | 25 ------ 7 files changed, 27 insertions(+), 171 deletions(-) diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java index 76ab346..42c597e 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java @@ -48,6 +48,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase { message.getCompensationMethod(), message.getPayloads().toByteArray() )); + GrpcEmpty reply = GrpcEmpty.newBuilder().build(); responseObserver.onNext(reply); responseObserver.onCompleted(); diff --git a/integration-tests/coverage-aggregate/pom.xml b/integration-tests/coverage-aggregate/pom.xml index ffebdee..72a172d 100644 --- a/integration-tests/coverage-aggregate/pom.xml +++ b/integration-tests/coverage-aggregate/pom.xml @@ -69,6 +69,10 @@ </dependency> <dependency> <groupId>io.servicecomb.saga</groupId> + <artifactId>omega-connector-grpc</artifactId> + </dependency> + <dependency> + <groupId>io.servicecomb.saga</groupId> <artifactId>alpha-server</artifactId> </dependency> <dependency> diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java index 25f6223..16f94b3 100644 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java +++ b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java @@ -22,21 +22,23 @@ package io.servicecomb.saga.omega.connector.grpc; import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; import io.servicecomb.saga.omega.transaction.MessageSender; import io.servicecomb.saga.omega.transaction.MessageSerializer; import io.servicecomb.saga.omega.transaction.TxEvent; import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder; -import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc; +import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; public class GrpcClientMessageSender implements MessageSender { - private final GrpcTxEventEndpoint eventService; + private final TxEventServiceBlockingStub eventService; private final MessageSerializer serializer; - public GrpcClientMessageSender(GrpcTxEventEndpoint eventService, MessageSerializer serializer) { - this.eventService = eventService; + public GrpcClientMessageSender(ManagedChannel eventService, MessageSerializer serializer) { + this.eventService = TxEventServiceGrpc.newBlockingStub(eventService); this.serializer = serializer; } @@ -47,12 +49,14 @@ public class GrpcClientMessageSender implements MessageSender { private GrpcTxEvent convertEvent(TxEvent event) { ByteString payloads = ByteString.copyFrom(serializer.serialize(event.payloads())); + Builder builder = GrpcTxEvent.newBuilder() .setTimestamp(event.timestamp()) .setGlobalTxId(event.globalTxId()) .setLocalTxId(event.localTxId()) .setType(event.type()) .setPayloads(payloads); + if (event.parentTxId() != null) { builder.setParentTxId(event.parentTxId()); } diff --git a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java deleted file mode 100644 index b3f2b26..0000000 --- a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package io.servicecomb.saga.omega.connector.grpc; - -import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; -import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; -import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; - -public class GrpcTxEventEndpointImpl implements GrpcTxEventEndpoint { - - private final TxEventServiceBlockingStub stub; - - public GrpcTxEventEndpointImpl(TxEventServiceBlockingStub stub) { - this.stub = stub; - } - - @Override - public void reportEvent(GrpcTxEvent event) { - stub.reportEvent(event); - } -} diff --git a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java deleted file mode 100644 index ca4f034..0000000 --- a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package io.servicecomb.saga.omega.connector.grpc; - -import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -import org.junit.Test; - -import io.servicecomb.saga.omega.transaction.MessageSerializer; -import io.servicecomb.saga.omega.transaction.TxEvent; -import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent; -import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint; - -public class GrpcClientMessageSenderTest { - private final String globalTxId = uniquify("global tx id"); - - private final String localTxId = uniquify("local tx id"); - - private final String parentTxId = uniquify("parent tx id"); - - private final String payload1 = uniquify("payload1"); - - private final String payload2 = uniquify("payload2"); - - private GrpcTxEvent grpcTxEvent; - - private final MessageSerializer serializer = new MessageSerializer() { - @Override - public byte[] serialize(TxEvent event) { - return serialize(event.payloads()); - } - - @Override - public byte[] serialize(Object[] objects) { - try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { - for (Object o : objects) { - stream.write(o.toString().getBytes()); - } - return stream.toByteArray(); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - }; - - - private final GrpcTxEventEndpoint eventService = new GrpcTxEventEndpoint() { - @Override - public void reportEvent(GrpcTxEvent event) { - grpcTxEvent = event; - } - }; - - private final GrpcClientMessageSender messageSender = new GrpcClientMessageSender(eventService, serializer); - - @Test - public void sendSerializedEvent() throws Exception { - TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, payload1, payload2); - - messageSender.send(event); - - assertThat(grpcTxEvent.getGlobalTxId(), is(event.globalTxId())); - assertThat(grpcTxEvent.getLocalTxId(), is(event.localTxId())); - assertThat(grpcTxEvent.getParentTxId(), is(event.parentTxId())); - assertThat(grpcTxEvent.getPayloads().toByteArray(), is(serializer.serialize(event))); - } -} \ No newline at end of file diff --git a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java index 9b749fd..73e2212 100644 --- a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java +++ b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java @@ -18,7 +18,9 @@ package io.servicecomb.saga.omega.spring; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import javax.annotation.PreDestroy; @@ -31,21 +33,17 @@ import org.springframework.context.annotation.Configuration; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender; -import io.servicecomb.saga.omega.connector.grpc.GrpcTxEventEndpointImpl; import io.servicecomb.saga.omega.context.IdGenerator; import io.servicecomb.saga.omega.context.OmegaContext; import io.servicecomb.saga.omega.context.UniqueIdGenerator; import io.servicecomb.saga.omega.format.NativeMessageFormat; import io.servicecomb.saga.omega.transaction.MessageSender; -import io.servicecomb.saga.omega.transaction.MessageSerializer; -import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc; -import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub; @Configuration class OmegaSpringConfig { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private ManagedChannel clientChannel; + private final List<ManagedChannel> channels = new ArrayList<>(); @Bean IdGenerator<String> idGenerator() { @@ -59,7 +57,7 @@ class OmegaSpringConfig { @PreDestroy void close() { - clientChannel.shutdown(); + channels.forEach(ManagedChannel::shutdown); } @Bean @@ -67,8 +65,7 @@ class OmegaSpringConfig { // TODO: 2017/12/26 connect to the one with lowest latency for (String address : addresses) { try { - String[] pair = address.split(":"); - return createMessageSender(pair[0], Integer.parseInt(pair[1]), new NativeMessageFormat()); + return new GrpcClientMessageSender(grpcChannel(address), new NativeMessageFormat()); } catch (Exception e) { log.error("Unable to connect to alpha at {}", address, e); } @@ -78,10 +75,14 @@ class OmegaSpringConfig { "None of the alpha cluster is reachable: " + Arrays.toString(addresses)); } - private GrpcClientMessageSender createMessageSender(String host, int port, MessageSerializer serializer) { - clientChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build(); - TxEventServiceBlockingStub stub = TxEventServiceGrpc.newBlockingStub(clientChannel); - GrpcTxEventEndpointImpl eventService = new GrpcTxEventEndpointImpl(stub); - return new GrpcClientMessageSender(eventService, serializer); + private ManagedChannel grpcChannel(String address) { + String[] pair = address.split(":"); + + ManagedChannel channel = ManagedChannelBuilder.forAddress(pair[0], Integer.parseInt(pair[1])) + .usePlaintext(true) + .build(); + + channels.add(channel); + return channel; } } diff --git a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java deleted file mode 100644 index 32a3b6b..0000000 --- a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * - */ - -package io.servicecomb.saga.pack.contract.grpc; - -public interface GrpcTxEventEndpoint { - void reportEvent(GrpcTxEvent message); -} -- To stop receiving notification emails like this one, please contact "commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.