This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 5328f26bb41b8799b76bffb7655fa758107fb294
Author: seanyinx <sean....@huawei.com>
AuthorDate: Fri Dec 29 17:47:13 2017 +0800

    SCB-149 removed unnecessary endpoint interface
    
    Signed-off-by: seanyinx <sean....@huawei.com>
---
 .../saga/alpha/server/GrpcTxEventEndpointImpl.java |  1 +
 integration-tests/coverage-aggregate/pom.xml       |  4 +
 .../connector/grpc/GrpcClientMessageSender.java    | 12 ++-
 .../connector/grpc/GrpcTxEventEndpointImpl.java    | 39 ----------
 .../grpc/GrpcClientMessageSenderTest.java          | 90 ----------------------
 .../saga/omega/spring/OmegaSpringConfig.java       | 27 +++----
 .../pack/contract/grpc/GrpcTxEventEndpoint.java    | 25 ------
 7 files changed, 27 insertions(+), 171 deletions(-)

diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
index 76ab346..42c597e 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/GrpcTxEventEndpointImpl.java
@@ -48,6 +48,7 @@ class GrpcTxEventEndpointImpl extends TxEventServiceImplBase {
         message.getCompensationMethod(),
         message.getPayloads().toByteArray()
     ));
+
     GrpcEmpty reply = GrpcEmpty.newBuilder().build();
     responseObserver.onNext(reply);
     responseObserver.onCompleted();
diff --git a/integration-tests/coverage-aggregate/pom.xml 
b/integration-tests/coverage-aggregate/pom.xml
index ffebdee..72a172d 100644
--- a/integration-tests/coverage-aggregate/pom.xml
+++ b/integration-tests/coverage-aggregate/pom.xml
@@ -69,6 +69,10 @@
     </dependency>
     <dependency>
       <groupId>io.servicecomb.saga</groupId>
+      <artifactId>omega-connector-grpc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.servicecomb.saga</groupId>
       <artifactId>alpha-server</artifactId>
     </dependency>
     <dependency>
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
index 25f6223..16f94b3 100644
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
+++ 
b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSender.java
@@ -22,21 +22,23 @@ package io.servicecomb.saga.omega.connector.grpc;
 
 import com.google.protobuf.ByteString;
 
+import io.grpc.ManagedChannel;
 import io.servicecomb.saga.omega.transaction.MessageSender;
 import io.servicecomb.saga.omega.transaction.MessageSerializer;
 import io.servicecomb.saga.omega.transaction.TxEvent;
 import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
 import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent.Builder;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
+import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
+import 
io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
 
 public class GrpcClientMessageSender implements MessageSender {
 
-  private final GrpcTxEventEndpoint eventService;
+  private final TxEventServiceBlockingStub eventService;
 
   private final MessageSerializer serializer;
 
-  public GrpcClientMessageSender(GrpcTxEventEndpoint eventService, 
MessageSerializer serializer) {
-    this.eventService = eventService;
+  public GrpcClientMessageSender(ManagedChannel eventService, 
MessageSerializer serializer) {
+    this.eventService = TxEventServiceGrpc.newBlockingStub(eventService);
     this.serializer = serializer;
   }
 
@@ -47,12 +49,14 @@ public class GrpcClientMessageSender implements 
MessageSender {
 
   private GrpcTxEvent convertEvent(TxEvent event) {
     ByteString payloads = 
ByteString.copyFrom(serializer.serialize(event.payloads()));
+
     Builder builder = GrpcTxEvent.newBuilder()
         .setTimestamp(event.timestamp())
         .setGlobalTxId(event.globalTxId())
         .setLocalTxId(event.localTxId())
         .setType(event.type())
         .setPayloads(payloads);
+
     if (event.parentTxId() != null) {
       builder.setParentTxId(event.parentTxId());
     }
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
 
b/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
deleted file mode 100644
index b3f2b26..0000000
--- 
a/omega/omega-connector/omega-connector-grpc/src/main/java/io/servicecomb/saga/omega/connector/grpc/GrpcTxEventEndpointImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.omega.connector.grpc;
-
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
-import 
io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
-
-public class GrpcTxEventEndpointImpl implements GrpcTxEventEndpoint {
-
-  private final TxEventServiceBlockingStub stub;
-
-  public GrpcTxEventEndpointImpl(TxEventServiceBlockingStub stub) {
-    this.stub = stub;
-  }
-
-  @Override
-  public void reportEvent(GrpcTxEvent event) {
-    stub.reportEvent(event);
-  }
-}
diff --git 
a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
 
b/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
deleted file mode 100644
index ca4f034..0000000
--- 
a/omega/omega-connector/omega-connector-grpc/src/test/java/io/servicecomb/saga/omega/connector/grpc/GrpcClientMessageSenderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.omega.connector.grpc;
-
-import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-import org.junit.Test;
-
-import io.servicecomb.saga.omega.transaction.MessageSerializer;
-import io.servicecomb.saga.omega.transaction.TxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEvent;
-import io.servicecomb.saga.pack.contract.grpc.GrpcTxEventEndpoint;
-
-public class GrpcClientMessageSenderTest {
-  private final String globalTxId = uniquify("global tx id");
-
-  private final String localTxId = uniquify("local tx id");
-
-  private final String parentTxId = uniquify("parent tx id");
-
-  private final String payload1 = uniquify("payload1");
-
-  private final String payload2 = uniquify("payload2");
-
-  private GrpcTxEvent grpcTxEvent;
-
-  private final MessageSerializer serializer = new MessageSerializer() {
-    @Override
-    public byte[] serialize(TxEvent event) {
-      return serialize(event.payloads());
-    }
-
-    @Override
-    public byte[] serialize(Object[] objects) {
-      try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
-        for (Object o : objects) {
-          stream.write(o.toString().getBytes());
-        }
-        return stream.toByteArray();
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-    }
-  };
-
-
-  private final GrpcTxEventEndpoint eventService = new GrpcTxEventEndpoint() {
-    @Override
-    public void reportEvent(GrpcTxEvent event) {
-      grpcTxEvent = event;
-    }
-  };
-
-  private final GrpcClientMessageSender messageSender = new 
GrpcClientMessageSender(eventService, serializer);
-
-  @Test
-  public void sendSerializedEvent() throws Exception {
-    TxEvent event = new TxEvent(globalTxId, localTxId, parentTxId, payload1, 
payload2);
-
-    messageSender.send(event);
-
-    assertThat(grpcTxEvent.getGlobalTxId(), is(event.globalTxId()));
-    assertThat(grpcTxEvent.getLocalTxId(), is(event.localTxId()));
-    assertThat(grpcTxEvent.getParentTxId(), is(event.parentTxId()));
-    assertThat(grpcTxEvent.getPayloads().toByteArray(), 
is(serializer.serialize(event)));
-  }
-}
\ No newline at end of file
diff --git 
a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
 
b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
index 9b749fd..73e2212 100644
--- 
a/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
+++ 
b/omega/omega-spring-starter/src/main/java/io/servicecomb/saga/omega/spring/OmegaSpringConfig.java
@@ -18,7 +18,9 @@
 package io.servicecomb.saga.omega.spring;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import javax.annotation.PreDestroy;
 
@@ -31,21 +33,17 @@ import org.springframework.context.annotation.Configuration;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import io.servicecomb.saga.omega.connector.grpc.GrpcClientMessageSender;
-import io.servicecomb.saga.omega.connector.grpc.GrpcTxEventEndpointImpl;
 import io.servicecomb.saga.omega.context.IdGenerator;
 import io.servicecomb.saga.omega.context.OmegaContext;
 import io.servicecomb.saga.omega.context.UniqueIdGenerator;
 import io.servicecomb.saga.omega.format.NativeMessageFormat;
 import io.servicecomb.saga.omega.transaction.MessageSender;
-import io.servicecomb.saga.omega.transaction.MessageSerializer;
-import io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc;
-import 
io.servicecomb.saga.pack.contract.grpc.TxEventServiceGrpc.TxEventServiceBlockingStub;
 
 @Configuration
 class OmegaSpringConfig {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private ManagedChannel clientChannel;
+  private final List<ManagedChannel> channels = new ArrayList<>();
 
   @Bean
   IdGenerator<String> idGenerator() {
@@ -59,7 +57,7 @@ class OmegaSpringConfig {
 
   @PreDestroy
   void close() {
-    clientChannel.shutdown();
+    channels.forEach(ManagedChannel::shutdown);
   }
 
   @Bean
@@ -67,8 +65,7 @@ class OmegaSpringConfig {
     // TODO: 2017/12/26 connect to the one with lowest latency
     for (String address : addresses) {
       try {
-        String[] pair = address.split(":");
-        return createMessageSender(pair[0], Integer.parseInt(pair[1]), new 
NativeMessageFormat());
+        return new GrpcClientMessageSender(grpcChannel(address), new 
NativeMessageFormat());
       } catch (Exception e) {
         log.error("Unable to connect to alpha at {}", address, e);
       }
@@ -78,10 +75,14 @@ class OmegaSpringConfig {
         "None of the alpha cluster is reachable: " + 
Arrays.toString(addresses));
   }
 
-  private GrpcClientMessageSender createMessageSender(String host, int port, 
MessageSerializer serializer) {
-    clientChannel = ManagedChannelBuilder.forAddress(host, 
port).usePlaintext(true).build();
-    TxEventServiceBlockingStub stub = 
TxEventServiceGrpc.newBlockingStub(clientChannel);
-    GrpcTxEventEndpointImpl eventService = new GrpcTxEventEndpointImpl(stub);
-    return new GrpcClientMessageSender(eventService, serializer);
+  private ManagedChannel grpcChannel(String address) {
+    String[] pair = address.split(":");
+
+    ManagedChannel channel = ManagedChannelBuilder.forAddress(pair[0], 
Integer.parseInt(pair[1]))
+        .usePlaintext(true)
+        .build();
+
+    channels.add(channel);
+    return channel;
   }
 }
diff --git 
a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
 
b/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
deleted file mode 100644
index 32a3b6b..0000000
--- 
a/pack-contracts/pack-contract-grpc/src/main/java/io/servicecomb/saga/pack/contract/grpc/GrpcTxEventEndpoint.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- *
- */
-
-package io.servicecomb.saga.pack.contract.grpc;
-
-public interface GrpcTxEventEndpoint {
-  void reportEvent(GrpcTxEvent message);
-}

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.

Reply via email to