This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 46f0f3b950419439287f760a54d868660005435a Author: cherrylzhao <[email protected]> AuthorDate: Wed Aug 22 22:59:49 2018 +0800 SCB-856 Add junit test framework code for alpha TCC workflow. --- .../saga/alpha/tcc/server/AlphaTccServerTest.java | 125 +++++++++++++++++++++ .../tcc/server/common/AlphaTccApplication.java | 28 +++++ .../saga/alpha/tcc/server/common/Bootstrap.java | 23 ++++ .../alpha/tcc/server/common/GrpcBootstrap.java | 109 ++++++++++++++++++ .../tcc/server/common/GrpcTccServerConfig.java | 75 +++++++++++++ .../common/TccCoordinateCommandStreamObserver.java | 52 +++++++++ 6 files changed, 412 insertions(+) diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java new file mode 100644 index 0000000..5e303e4 --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/AlphaTccServerTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server; + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; + +import io.grpc.ManagedChannel; +import io.grpc.netty.NettyChannelBuilder; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.PostConstruct; +import org.apache.servicecomb.saga.alpha.tcc.server.common.AlphaTccApplication; +import org.apache.servicecomb.saga.alpha.tcc.server.common.Bootstrap; +import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcBootstrap; +import org.apache.servicecomb.saga.alpha.tcc.server.common.GrpcTccServerConfig; +import org.apache.servicecomb.saga.alpha.tcc.server.common.TccCoordinateCommandStreamObserver; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcAck; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcServiceConfig; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceBlockingStub; +import org.apache.servicecomb.saga.pack.contract.grpc.TccEventServiceGrpc.TccEventServiceStub; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = {AlphaTccApplication.class}, + properties = { + "alpha.server.host=0.0.0.0", + "alpha.server.port=8098", + "alpha.event.pollingInterval=1" + }) +public class AlphaTccServerTest { + + @Autowired + private GrpcTccServerConfig grpcTccServerConfig; + + private static GrpcTccServerConfig serverConfig; + @PostConstruct + public void init() { + serverConfig = grpcTccServerConfig; + server = new GrpcBootstrap(serverConfig, new GrpcTccEventService()); + new Thread(server::start).start(); + } + + private static final int port = 8090; + private static Bootstrap server; + protected static ManagedChannel clientChannel; + + private final TccEventServiceStub asyncStub = TccEventServiceGrpc.newStub(clientChannel); + + private final TccEventServiceBlockingStub blockingStub = TccEventServiceGrpc.newBlockingStub(clientChannel); + + private static final Queue<GrpcTccCordinateCommand> receivedCommands = new ConcurrentLinkedQueue<>(); + + private final TccCoordinateCommandStreamObserver commandStreamObserver = + new TccCoordinateCommandStreamObserver(this::onCompensation, receivedCommands); + + private final String globalTxId = UUID.randomUUID().toString(); + private final String localTxId = UUID.randomUUID().toString(); + private final String parentTxId = UUID.randomUUID().toString(); + private final String compensationMethod = getClass().getCanonicalName(); + + private final String serviceName = uniquify("serviceName"); + private final String instanceId = uniquify("instanceId"); + + private final GrpcServiceConfig serviceConfig = GrpcServiceConfig.newBuilder() + .setServiceName(serviceName) + .setInstanceId(instanceId) + .build(); + + @BeforeClass + public static void setupClientChannel() { + clientChannel = NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build(); + } + + @AfterClass + public static void tearDown() { + clientChannel.shutdown(); + clientChannel = null; + } + + @Before + public void before() { + System.out.println(" globalTxId " + globalTxId); + } + + @After + public void after() { +// blockingStub.onDisconnected(serviceConfig); + } + + @Test + public void assertOnConnect() { +// asyncStub.onConnected(serviceConfig, commandStreamObserver); + } + + private GrpcAck onCompensation(GrpcTccCordinateCommand command) { + return GrpcAck.newBuilder().setAborted(false).build(); + } + +} diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java new file mode 100644 index 0000000..464dffc --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/AlphaTccApplication.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.common; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class AlphaTccApplication { + public static void main(String[] args) { + SpringApplication.run(AlphaTccApplication.class, args); + } +} diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java new file mode 100644 index 0000000..6382378 --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/Bootstrap.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.common; + +public interface Bootstrap { + + void start(); +} diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java new file mode 100644 index 0000000..8021161 --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcBootstrap.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.common; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyServerBuilder; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Properties; +import javax.net.ssl.SSLException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GrpcBootstrap implements Bootstrap { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final Server server; + + public GrpcBootstrap(GrpcTccServerConfig serverConfig, BindableService... services) { + ServerBuilder<?> serverBuilder; + if (serverConfig.isSslEnable()) { + serverBuilder = NettyServerBuilder.forAddress( + new InetSocketAddress(serverConfig.getHost(), serverConfig.getPort())); + + try { + ((NettyServerBuilder) serverBuilder).sslContext(getSslContextBuilder(serverConfig).build()); + } catch (SSLException e) { + throw new IllegalStateException("Unable to setup grpc to use SSL.", e); + } + } else { + serverBuilder = ServerBuilder.forPort(serverConfig.getPort()); + } + Arrays.stream(services).forEach(serverBuilder::addService); + server = serverBuilder.build(); + } + + @Override + public void start() { + Runtime.getRuntime().addShutdownHook(new Thread(server::shutdown)); + + try { + server.start(); + server.awaitTermination(); + } catch (IOException e) { + throw new IllegalStateException("Unable to start grpc server.", e); + } catch (InterruptedException e) { + LOG.error("grpc server was interrupted.", e); + Thread.currentThread().interrupt(); + } + } + + private SslContextBuilder getSslContextBuilder(GrpcTccServerConfig config) { + + Properties prop = new Properties(); + ClassLoader classLoader = getClass().getClassLoader(); + try { + prop.load(classLoader.getResourceAsStream("ssl.properties")); + } catch (IOException e) { + throw new IllegalStateException("Unable to read ssl.properties.", e); + } + + InputStream cert = getInputStream(classLoader, config.getCert(), "Server Cert"); + InputStream key = getInputStream(classLoader, config.getKey(), "Server Key"); + + SslContextBuilder sslClientContextBuilder = SslContextBuilder.forServer(cert, key) + .protocols(prop.getProperty("protocols")) + .ciphers(Arrays.asList(prop.getProperty("ciphers").split(","))); + if (config.isMutualAuth()) { + InputStream clientCert = getInputStream(classLoader, config.getClientCert(), "Client Cert"); + sslClientContextBuilder.trustManager(clientCert); + sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE); + } + return GrpcSslContexts.configure(sslClientContextBuilder, + SslProvider.OPENSSL); + } + + private InputStream getInputStream(ClassLoader classLoader, String resource, String config) { + InputStream is = classLoader.getResourceAsStream(resource); + if (is == null) { + throw new IllegalStateException("Cannot load the " + config + " from " + resource); + } + return is; + + } +} diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java new file mode 100644 index 0000000..f40076c --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/GrpcTccServerConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.common; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class GrpcTccServerConfig { + @Value("${alpha.server.host:0.0.0.0}") + private String host; + + @Value("${alpha.server.port:8080}") + private int port; + + @Value("${alpha.server.ssl.enable:false}") + private boolean sslEnable; + + @Value("${alpha.server.ssl.cert:server.crt}") + private String cert; + + @Value("${alpha.server.ssl.key:server.pem}") + private String key; + + @Value("${alpha.server.ssl.mutualAuth:false}") + private boolean mutualAuth; + + @Value("${alpha.server.ssl.clientCert:client.crt}") + private String clientCert; + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public boolean isSslEnable() { + return sslEnable; + } + + public String getCert() { + return cert; + } + + public String getKey() { + return key; + } + + public boolean isMutualAuth() { + return mutualAuth; + } + + public String getClientCert() { + return clientCert; + } +} + + diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java new file mode 100644 index 0000000..7f1295b --- /dev/null +++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/saga/alpha/tcc/server/common/TccCoordinateCommandStreamObserver.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.saga.alpha.tcc.server.common; + +import io.grpc.stub.StreamObserver; +import java.util.Queue; +import java.util.function.Consumer; +import org.apache.servicecomb.saga.pack.contract.grpc.GrpcTccCordinateCommand; + +public class TccCoordinateCommandStreamObserver implements StreamObserver<GrpcTccCordinateCommand> { + + private static Queue<GrpcTccCordinateCommand> receivedCommands; + private Consumer<GrpcTccCordinateCommand> consumer; + private boolean completed = false; + + public TccCoordinateCommandStreamObserver(Consumer<GrpcTccCordinateCommand> consumer, + Queue<GrpcTccCordinateCommand> receivedCommands) { + this.consumer = consumer; + TccCoordinateCommandStreamObserver.receivedCommands = receivedCommands; + } + + @Override + public void onNext(GrpcTccCordinateCommand value) { + consumer.accept(value); + receivedCommands.add(value); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onCompleted() { + completed = true; + } +}
