http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java new file mode 100644 index 0000000..ed6b634 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public final class RpcChannelFactory { + private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class); + + private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2; + + private static final Object lockObjectForLoopGroup = new Object(); + private static AtomicInteger serverCount = new AtomicInteger(0); + + public enum ClientChannelId { + CLIENT_DEFAULT, + FETCHER + } + + private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount = + new ConcurrentHashMap<ClientChannelId, Integer>(); + private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool = + new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>(); + + private RpcChannelFactory(){ + } + + static { + Runtime.getRuntime().addShutdownHook(new CleanUpHandler()); + + defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1); + defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1); + } + + /** + * make this factory static thus all clients can share its thread pool. + * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe + */ + public static EventLoopGroup getSharedClientEventloopGroup() { + return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM); + } + + /** + * make this factory static thus all clients can share its thread pool. + * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe + * + * @param workerNum The number of workers + */ + public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){ + //shared woker and boss pool + return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum); + } + + /** + * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput. + * + * @param clientId + * @param workerNum + * @return + */ + public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) { + Queue<EventLoopGroup> eventLoopGroupQueue; + EventLoopGroup returnEventLoopGroup; + + synchronized (lockObjectForLoopGroup) { + eventLoopGroupQueue = eventLoopGroupPool.get(clientId); + if (eventLoopGroupQueue == null) { + eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum); + } + + returnEventLoopGroup = eventLoopGroupQueue.poll(); + if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) { + returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum); + } + eventLoopGroupQueue.add(returnEventLoopGroup); + } + + return returnEventLoopGroup; + } + + protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) { + return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown()); + } + + // Client must release the external resources + protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) { + int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId); + Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>(); + eventLoopGroupPool.put(clientId, loopGroupQueue); + + for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) { + loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum)); + } + + return loopGroupQueue; + } + + protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) { + if (LOG.isDebugEnabled()) { + LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum); + } + + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build(); + + return new NioEventLoopGroup(workerNum, clientFactory); + } + + // Client must release the external resources + public static ServerBootstrap createServerChannelFactory(String name, int workerNum) { + name = name + "-" + serverCount.incrementAndGet(); + if(LOG.isInfoEnabled()){ + LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum); + } + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build(); + ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build(); + + EventLoopGroup bossGroup = + new NioEventLoopGroup(1, bossFactory); + EventLoopGroup workerGroup = + new NioEventLoopGroup(workerNum, workerFactory); + + return new ServerBootstrap().group(bossGroup, workerGroup); + } + + public static void shutdownGracefully(){ + if(LOG.isDebugEnabled()) { + LOG.debug("Shutdown Shared RPC Pool"); + } + + synchronized(lockObjectForLoopGroup) { + for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) { + for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) { + eventLoopGroup.shutdownGracefully(); + } + + eventLoopGroupQueue.clear(); + } + eventLoopGroupPool.clear(); + } + } + + static class CleanUpHandler extends Thread { + + @Override + public void run() { + RpcChannelFactory.shutdownGracefully(); + } + + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java new file mode 100644 index 0000000..4d72536 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcEventListener.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +/** + * Event listener for netty code. Users can subscribe events by using this interface. + */ +public interface RpcEventListener { + + /** + * Performs actions before start. + * @param obj Method caller + */ + public void onBeforeStart(Object obj); + + /** + * Performs actions after start. + * @param obj Method caller + */ + public void onAfterStart(Object obj); + + /** + * Performs actions before initialization. + * @param obj Method caller + */ + public void onBeforeInit(Object obj); + + /** + * Performs actions after initialization. + * @param obj Method caller + */ + public void onAfterInit(Object obj); + + /** + * Performs actions before shutdown. + * @param obj Method caller + */ + public void onBeforeShutdown(Object obj); + + /** + * Performs actions after shutdown. + * @param obj Method caller + */ + public void onAfterShutdown(Object obj); + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java new file mode 100644 index 0000000..152d426 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicReference; + +public class RpcUtils { + + public static String normalizeInetSocketAddress(InetSocketAddress addr) { + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + + /** + * Util method to build socket addr from either: + * <host> + * <host>:<port> + * <fs>://<host>:<port>/<path> + */ + public static InetSocketAddress createSocketAddr(String host, int port) { + return new InetSocketAddress(host, port); + } + + /** + * Returns InetSocketAddress that a client can use to + * connect to the server. NettyServerBase.getListenerAddress() is not correct when + * the server binds to "0.0.0.0". This returns "hostname:port" of the server, + * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port". + * + * @param addr of a listener + * @return socket address that a client can use to connect to the server. + */ + public static InetSocketAddress getConnectAddress(InetSocketAddress addr) { + if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) { + try { + addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort()); + } catch (UnknownHostException uhe) { + // shouldn't get here unless the host doesn't have a loopback iface + addr = new InetSocketAddress("127.0.0.1", addr.getPort()); + } + } + InetSocketAddress canonicalAddress = + new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort()); + return canonicalAddress; + } + + public static InetSocketAddress createUnresolved(String addr) { + String [] splitted = addr.split(":"); + return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1])); + } + + public static class Timer { + private long remaining; + private long prev; + public Timer(long timeout) { + this.remaining = timeout; + this.prev = System.currentTimeMillis(); + } + + public boolean isTimedOut() { + return remaining <= 0; + } + + public void elapsed() { + long current = System.currentTimeMillis(); + remaining -= (prev - current); + prev = current; + } + + public void interval(long wait) { + if (wait <= 0 || isTimedOut()) { + return; + } + try { + Thread.sleep(Math.min(remaining, wait)); + } catch (Exception ex) { + // ignore + } + } + + public long remaining() { + return remaining; + } + } + + public static class Scrutineer<T> { + + private final AtomicReference<T> reference = new AtomicReference<T>(); + + T check(T ticket) { + T granted = reference.get(); + for (;granted == null; granted = reference.get()) { + if (reference.compareAndSet(null, ticket)) { + return ticket; + } + } + return granted; + } + + boolean clear(T granted) { + return reference.compareAndSet(granted, null); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/pom.xml b/tajo-rpc/tajo-rpc-protobuf/pom.xml new file mode 100644 index 0000000..1f67255 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/pom.xml @@ -0,0 +1,274 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>tajo-project</artifactId> + <version>0.11.0-SNAPSHOT</version> + <groupId>org.apache.tajo</groupId> + <relativePath>../../tajo-project</relativePath> + </parent> + <packaging>jar</packaging> + <artifactId>tajo-rpc-protobuf</artifactId> + <name>Tajo Protocol Buffer Rpc</name> + <description>RPC Server/Client Implementation based on Netty and Protocol Buffer</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + </configuration> + <executions> + <execution> + <id>create-jar</id> + <phase>prepare-package</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>create-protobuf-generated-sources-directory</id> + <phase>initialize</phase> + <configuration> + <target> + <mkdir dir="target/generated-sources/proto" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <executable>protoc</executable> + <arguments> + <argument>-Isrc/main/proto/</argument> + <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/DummyProtos.proto</argument> + <argument>src/main/proto/RpcProtos.proto</argument> + <argument>src/main/proto/TestProtos.proto</argument> + <argument>src/main/proto/TestProtocol.proto</argument> + </arguments> + </configuration> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/proto</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-rpc-common</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>dist</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>dist</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <echo file="${project.build.directory}/dist-layout-stitching.sh"> + run() { + echo "\$ ${@}" + "${@}" + res=$? + if [ $res != 0 ]; then + echo + echo "Failed!" + echo + exit $res + fi + } + + ROOT=`cd ${basedir}/..;pwd` + echo + echo "Current directory `pwd`" + echo + run rm -rf ${project.artifactId}-${project.version} + run mkdir ${project.artifactId}-${project.version} + run cd ${project.artifactId}-${project.version} + run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + echo + echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo + </echo> + <exec executable="sh" dir="${project.build.directory}" failonerror="true"> + <arg line="./dist-layout-stitching.sh" /> + </exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> + +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java new file mode 100644 index 0000000..3d856ce --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.*; + +import io.netty.channel.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; +import org.apache.tajo.rpc.RpcProtos.RpcRequest; +import org.apache.tajo.rpc.RpcProtos.RpcResponse; + +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GenericFutureListener; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class AsyncRpcClient extends NettyClientBase { + private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); + + private final ConcurrentMap<Integer, ResponseCallback> requests = + new ConcurrentHashMap<Integer, ResponseCallback>(); + + private final Method stubMethod; + private final ProxyRpcChannel rpcChannel; + private final ClientChannelInboundHandler inboundHandler; + + /** + * Intentionally make this method package-private, avoiding user directly + * new an instance through this constructor. + */ + AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, retries); + stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); + rpcChannel = new ProxyRpcChannel(); + inboundHandler = new ClientChannelInboundHandler(); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); + } + + @Override + public <T> T getStub() { + return getStub(stubMethod, rpcChannel); + } + + protected void sendExceptions(String message) { + for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) { + ResponseCallback callback = callbackEntry.getValue(); + Integer id = callbackEntry.getKey(); + + RpcResponse.Builder responseBuilder = RpcResponse.newBuilder() + .setErrorMessage(message) + .setId(id); + + callback.run(responseBuilder.build()); + } + } + + @Override + public void close() { + sendExceptions("AsyncRpcClient terminates all the connections"); + + super.close(); + } + + private class ProxyRpcChannel implements RpcChannel { + + public void callMethod(final MethodDescriptor method, + final RpcController controller, + final Message param, + final Message responseType, + RpcCallback<Message> done) { + + int nextSeqId = sequence.getAndIncrement(); + + Message rpcRequest = buildRequest(nextSeqId, method, param); + + inboundHandler.registerCallback(nextSeqId, + new ResponseCallback(controller, responseType, done)); + + ChannelPromise channelPromise = getChannel().newPromise(); + channelPromise.addListener(new GenericFutureListener<ChannelFuture>() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); + } + } + }); + getChannel().writeAndFlush(rpcRequest, channelPromise); + } + + private Message buildRequest(int seqId, + MethodDescriptor method, + Message param) { + + RpcRequest.Builder requestBuilder = RpcRequest.newBuilder() + .setId(seqId) + .setMethodName(method.getName()); + + if (param != null) { + requestBuilder.setRequestMessage(param.toByteString()); + } + + return requestBuilder.build(); + } + } + + private class ResponseCallback implements RpcCallback<RpcResponse> { + private final RpcController controller; + private final Message responsePrototype; + private final RpcCallback<Message> callback; + + public ResponseCallback(RpcController controller, + Message responsePrototype, + RpcCallback<Message> callback) { + this.controller = controller; + this.responsePrototype = responsePrototype; + this.callback = callback; + } + + @Override + public void run(RpcResponse rpcResponse) { + // if hasErrorMessage is true, it means rpc-level errors. + // it does not call the callback function\ + if (rpcResponse.hasErrorMessage()) { + if (controller != null) { + this.controller.setFailed(rpcResponse.getErrorMessage()); + } + callback.run(null); + } else { // if rpc call succeed + try { + Message responseMessage; + if (!rpcResponse.hasResponseMessage()) { + responseMessage = null; + } else { + responseMessage = responsePrototype.newBuilderForType().mergeFrom( + rpcResponse.getResponseMessage()).build(); + } + + callback.run(responseMessage); + + } catch (InvalidProtocolBufferException e) { + throw new RemoteException(getErrorMessage(""), e); + } + } + } + } + + private String getErrorMessage(String message) { + return "Exception [" + protocol.getCanonicalName() + + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) + getChannel().remoteAddress()) + ")]: " + message; + } + + @ChannelHandler.Sharable + private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + + void registerCallback(int seqId, ResponseCallback callback) { + + if (requests.putIfAbsent(seqId, callback) != null) { + throw new RemoteException( + getErrorMessage("Duplicate Sequence Id "+ seqId)); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (msg instanceof RpcResponse) { + try { + RpcResponse response = (RpcResponse) msg; + ResponseCallback callback = requests.remove(response.getId()); + + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + callback.run(response); + } + } finally { + ReferenceCountUtil.release(msg); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause); + + sendExceptions(cause.getMessage()); + + if(LOG.isDebugEnabled()) { + LOG.error(cause.getMessage(), cause); + } else { + LOG.error("RPC Exception:" + cause.getMessage()); + } + + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java new file mode 100644 index 0000000..3b5a747 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.*; +import com.google.protobuf.Descriptors.MethodDescriptor; + +import io.netty.channel.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcProtos.RpcRequest; +import org.apache.tajo.rpc.RpcProtos.RpcResponse; + +import io.netty.util.ReferenceCountUtil; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; + +public class AsyncRpcServer extends NettyServerBase { + private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class); + + private final Service service; + private final ChannelInitializer<Channel> initializer; + + public AsyncRpcServer(final Class<?> protocol, + final Object instance, + final InetSocketAddress bindAddress, + final int workerNum) + throws Exception { + super(protocol.getSimpleName(), bindAddress); + + String serviceClassName = protocol.getName() + "$" + + protocol.getSimpleName() + "Service"; + Class<?> serviceClass = Class.forName(serviceClassName); + Class<?> interfaceClass = Class.forName(serviceClassName + "$Interface"); + Method method = serviceClass.getMethod("newReflectiveService", interfaceClass); + this.service = (Service) method.invoke(null, instance); + + this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); + super.init(this.initializer, workerNum); + } + + @ChannelHandler.Sharable + private class ServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + accepted.add(ctx.channel()); + if(LOG.isDebugEnabled()){ + LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size())); + } + super.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + accepted.remove(ctx.channel()); + if (LOG.isDebugEnabled()) { + LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size()); + } + super.channelUnregistered(ctx); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) + throws Exception { + if (msg instanceof RpcRequest) { + try { + final RpcRequest request = (RpcRequest) msg; + + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } + + Message paramProto = null; + if (request.hasRequestMessage()) { + try { + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + } + + final RpcController controller = new NettyRpcController(); + + RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { + + public void run(Message returnValue) { + + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } + + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } + + ctx.writeAndFlush(builder.build()); + } + }; + + service.callMethod(methodDescriptor, controller, paramProto, callback); + + } finally { + ReferenceCountUtil.release(msg); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception{ + if (cause instanceof RemoteCallException) { + RemoteCallException callException = (RemoteCallException) cause; + ctx.writeAndFlush(callException.getResponse()); + } else { + LOG.error(cause.getMessage()); + } + + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java new file mode 100644 index 0000000..6a90330 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.*; +import com.google.protobuf.Descriptors.MethodDescriptor; + +import io.netty.channel.*; +import io.netty.util.concurrent.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; +import org.apache.tajo.rpc.RpcProtos.RpcRequest; +import org.apache.tajo.rpc.RpcProtos.RpcResponse; + +import io.netty.util.ReferenceCountUtil; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.Future; + +public class BlockingRpcClient extends NettyClientBase { + private static final Log LOG = LogFactory.getLog(RpcProtos.class); + + private final Map<Integer, ProtoCallFuture> requests = + new ConcurrentHashMap<Integer, ProtoCallFuture>(); + + private final Method stubMethod; + private final ProxyRpcChannel rpcChannel; + private final ChannelInboundHandlerAdapter inboundHandler; + + /** + * Intentionally make this method package-private, avoiding user directly + * new an instance through this constructor. + */ + BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, retries); + stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); + rpcChannel = new ProxyRpcChannel(); + inboundHandler = new ClientChannelInboundHandler(); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); + } + + @Override + public <T> T getStub() { + return getStub(stubMethod, rpcChannel); + } + + @Override + public void close() { + for(ProtoCallFuture callback: requests.values()) { + callback.setFailed("BlockingRpcClient terminates all the connections", + new ServiceException("BlockingRpcClient terminates all the connections")); + } + + super.close(); + } + + private class ProxyRpcChannel implements BlockingRpcChannel { + + @Override + public Message callBlockingMethod(final MethodDescriptor method, + final RpcController controller, + final Message param, + final Message responsePrototype) + throws TajoServiceException { + + int nextSeqId = sequence.getAndIncrement(); + + Message rpcRequest = buildRequest(nextSeqId, method, param); + + ProtoCallFuture callFuture = + new ProtoCallFuture(controller, responsePrototype); + requests.put(nextSeqId, callFuture); + + ChannelPromise channelPromise = getChannel().newPromise(); + channelPromise.addListener(new GenericFutureListener<ChannelFuture>() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); + } + } + }); + getChannel().writeAndFlush(rpcRequest, channelPromise); + + try { + return callFuture.get(60, TimeUnit.SECONDS); + } catch (Throwable t) { + if (t instanceof ExecutionException) { + Throwable cause = t.getCause(); + if (cause != null && cause instanceof TajoServiceException) { + throw (TajoServiceException)cause; + } + } + throw new TajoServiceException(t.getMessage()); + } + } + + private Message buildRequest(int seqId, + MethodDescriptor method, + Message param) { + RpcRequest.Builder requestBuilder = RpcRequest.newBuilder() + .setId(seqId) + .setMethodName(method.getName()); + + if (param != null) { + requestBuilder.setRequestMessage(param.toByteString()); + } + + return requestBuilder.build(); + } + } + + private String getErrorMessage(String message) { + if(getChannel() != null) { + return protocol.getName() + + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) + getChannel().remoteAddress()) + "): " + message; + } else { + return "Exception " + message; + } + } + + private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { + if(getChannel() != null) { + return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress())); + } else { + return new TajoServiceException(response.getErrorMessage()); + } + } + + @ChannelHandler.Sharable + private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + + if (msg instanceof RpcResponse) { + try { + RpcResponse rpcResponse = (RpcResponse) msg; + ProtoCallFuture callback = requests.remove(rpcResponse.getId()); + + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + if (rpcResponse.hasErrorMessage()) { + callback.setFailed(rpcResponse.getErrorMessage(), + makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); + throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); + } else { + Message responseMessage; + + if (!rpcResponse.hasResponseMessage()) { + responseMessage = null; + } else { + responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) + .build(); + } + + callback.setResponse(responseMessage); + } + } + } finally { + ReferenceCountUtil.release(msg); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + for(ProtoCallFuture callback: requests.values()) { + callback.setFailed(cause.getMessage(), cause); + } + + if(LOG.isDebugEnabled()) { + LOG.error("" + cause.getMessage(), cause); + } else { + LOG.error("RPC Exception:" + cause.getMessage()); + } + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); + } + } + } + + static class ProtoCallFuture implements Future<Message> { + private Semaphore sem = new Semaphore(0); + private Message response = null; + private Message returnType; + + private RpcController controller; + + private ExecutionException ee; + + public ProtoCallFuture(RpcController controller, Message message) { + this.controller = controller; + this.returnType = message; + } + + @Override + public boolean cancel(boolean arg0) { + return false; + } + + @Override + public Message get() throws InterruptedException, ExecutionException { + sem.acquire(); + if(ee != null) { + throw ee; + } + return response; + } + + @Override + public Message get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if(sem.tryAcquire(timeout, unit)) { + if (ee != null) { + throw ee; + } + return response; + } else { + throw new TimeoutException(); + } + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return sem.availablePermits() > 0; + } + + public void setResponse(Message response) { + this.response = response; + sem.release(); + } + + public void setFailed(String errorText, Throwable t) { + if(controller != null) { + this.controller.setFailed(errorText); + } + ee = new ExecutionException(errorText, t); + sem.release(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java new file mode 100644 index 0000000..0ce359f --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; + +import io.netty.channel.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcProtos.RpcRequest; +import org.apache.tajo.rpc.RpcProtos.RpcResponse; + +import io.netty.util.ReferenceCountUtil; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; + +public class BlockingRpcServer extends NettyServerBase { + private static Log LOG = LogFactory.getLog(BlockingRpcServer.class); + private final BlockingService service; + private final ChannelInitializer<Channel> initializer; + + public BlockingRpcServer(final Class<?> protocol, + final Object instance, + final InetSocketAddress bindAddress, + final int workerNum) + throws Exception { + + super(protocol.getSimpleName(), bindAddress); + + String serviceClassName = protocol.getName() + "$" + + protocol.getSimpleName() + "Service"; + Class<?> serviceClass = Class.forName(serviceClassName); + Class<?> interfaceClass = Class.forName(serviceClassName + + "$BlockingInterface"); + Method method = serviceClass.getMethod( + "newReflectiveBlockingService", interfaceClass); + + this.service = (BlockingService) method.invoke(null, instance); + this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); + + super.init(this.initializer, workerNum); + } + + @ChannelHandler.Sharable + private class ServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + accepted.add(ctx.channel()); + if(LOG.isDebugEnabled()){ + LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size())); + } + super.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + accepted.remove(ctx.channel()); + if (LOG.isDebugEnabled()) { + LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size()); + } + super.channelUnregistered(ctx); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + + if (msg instanceof RpcRequest) { + try { + final RpcRequest request = (RpcRequest) msg; + + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } + Message paramProto = null; + if (request.hasRequestMessage()) { + try { + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + } + Message returnValue; + RpcController controller = new NettyRpcController(); + + try { + returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } + + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } + ctx.writeAndFlush(builder.build()); + } finally { + ReferenceCountUtil.release(msg); + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (cause instanceof RemoteCallException) { + RemoteCallException callException = (RemoteCallException) cause; + ctx.writeAndFlush(callException.getResponse()); + } + + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java new file mode 100644 index 0000000..c4c3256 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CallFuture.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class CallFuture<T> implements RpcCallback<T>, Future<T> { + + private final Semaphore sem = new Semaphore(0); + private boolean done = false; + private T response; + private RpcController controller; + + public CallFuture() { + controller = new DefaultRpcController(); + } + + public RpcController getController() { + return controller; + } + + @Override + public void run(T t) { + this.response = t; + done = true; + sem.release(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + controller.startCancel(); + sem.release(); + return controller.isCanceled(); + } + + @Override + public boolean isCancelled() { + return controller.isCanceled(); + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public T get() throws InterruptedException { + sem.acquire(); + + return response; + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, TimeoutException { + if (sem.tryAcquire(timeout, unit)) { + return response; + } else { + throw new TimeoutException(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java new file mode 100644 index 0000000..4ba19a5 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +public class DefaultRpcController implements RpcController { + private String errorText; + private boolean error; + private boolean canceled; + + @Override + public void reset() { + errorText = ""; + error = false; + canceled = false; + } + + @Override + public boolean failed() { + return error; + } + + @Override + public String errorText() { + return errorText; + } + + @Override + public void startCancel() { + this.canceled = true; + } + + @Override + public void setFailed(String s) { + this.errorText = s; + this.error = true; + } + + @Override + public boolean isCanceled() { + return canceled; + } + + @Override + public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java new file mode 100644 index 0000000..72278f2 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.GenericFutureListener; + +import java.io.Closeable; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public abstract class NettyClientBase implements Closeable { + private static final Log LOG = LogFactory.getLog(NettyClientBase.class); + private static final int CONNECTION_TIMEOUT = 60000; // 60 sec + private static final long PAUSE = 1000; // 1 sec + + private final int numRetries; + + private Bootstrap bootstrap; + private volatile ChannelFuture channelFuture; + + protected final Class<?> protocol; + protected final AtomicInteger sequence = new AtomicInteger(0); + + private final RpcConnectionKey key; + private final AtomicInteger counter = new AtomicInteger(0); // reference counter + + public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) + throws ClassNotFoundException, NoSuchMethodException { + this.key = rpcConnectionKey; + this.protocol = rpcConnectionKey.protocolClass; + this.numRetries = numRetries; + } + + // should be called from sub class + protected void init(ChannelInitializer<Channel> initializer) { + this.bootstrap = new Bootstrap(); + this.bootstrap + .channel(NioSocketChannel.class) + .handler(initializer) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT) + .option(ChannelOption.SO_RCVBUF, 1048576 * 10) + .option(ChannelOption.TCP_NODELAY, true); + } + + public RpcConnectionPool.RpcConnectionKey getKey() { + return key; + } + + protected final Class<?> getServiceClass() throws ClassNotFoundException { + String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; + return Class.forName(serviceClassName); + } + + @SuppressWarnings("unchecked") + protected final <T> T getStub(Method stubMethod, Object rpcChannel) { + try { + return (T) stubMethod.invoke(null, rpcChannel); + } catch (Exception e) { + throw new RemoteException(e.getMessage(), e); + } + } + + public abstract <T> T getStub(); + + public boolean acquire(long timeout) { + if (!checkConnection(timeout)) { + return false; + } + counter.incrementAndGet(); + return true; + } + + public boolean release() { + return counter.decrementAndGet() == 0; + } + + private boolean checkConnection(long timeout) { + if (isConnected()) { + return true; + } + + InetSocketAddress addr = key.addr; + if (addr.isUnresolved()) { + addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + } + + return handleConnectionInternally(addr, timeout); + } + + private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { + LOG.warn("Try to connect : " + address); + this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) + .connect(address) + .addListener(listener); + } + + // first attendant kicks connection + private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>(); + + private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) { + final CountDownLatch ticket = new CountDownLatch(1); + final CountDownLatch granted = connect.check(ticket); + + // basically, it's double checked lock + if (ticket == granted && isConnected()) { + granted.countDown(); + return true; + } + + if (ticket == granted) { + connectUsingNetty(addr, new RetryConnectionListener(addr, granted)); + } + + try { + granted.await(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + } + + boolean success = channelFuture.isSuccess(); + + if (granted.getCount() == 0) { + connect.clear(granted); + } + + return success; + } + + class RetryConnectionListener implements GenericFutureListener<ChannelFuture> { + private final AtomicInteger retryCount = new AtomicInteger(); + private final InetSocketAddress address; + private final CountDownLatch latch; + + RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) { + this.address = address; + this.latch = latch; + } + + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (!channelFuture.isSuccess()) { + channelFuture.channel().close(); + + if (numRetries > retryCount.getAndIncrement()) { + final GenericFutureListener<ChannelFuture> currentListener = this; + + RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() { + @Override + public void run() { + connectUsingNetty(address, currentListener); + } + }, PAUSE, TimeUnit.MILLISECONDS); + + LOG.debug("Connecting to " + address + " has been failed. Retrying to connect."); + } + else { + latch.countDown(); + + LOG.error("Max retry count has been exceeded. attempts=" + numRetries); + } + } + else { + latch.countDown(); + } + } + } + + public Channel getChannel() { + return channelFuture == null ? null : channelFuture.channel(); + } + + public boolean isConnected() { + Channel channel = getChannel(); + return channel != null && channel.isOpen() && channel.isActive(); + } + + public SocketAddress getRemoteAddress() { + Channel channel = getChannel(); + return channel == null ? null : channel.remoteAddress(); + } + + @Override + public void close() { + Channel channel = getChannel(); + if (channel != null && channel.isOpen()) { + LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); + channel.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java new file mode 100644 index 0000000..b7f4537 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyRpcController.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +public class NettyRpcController implements RpcController { + private String errorText; + + @Override + public void reset() { + errorText = null; + } + + @Override + public boolean failed() { + return errorText != null; + } + + @Override + public String errorText() { + return errorText; + } + + @Override + public void startCancel() { + // TODO - to be implemented + throw new UnsupportedOperationException(); + } + + @Override + public void setFailed(String s) { + errorText = s; + } + + @Override + public boolean isCanceled() { + // TODO - to be implemented + return false; + } + + @Override + public void notifyOnCancel(RpcCallback<Object> objectRpcCallback) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java new file mode 100644 index 0000000..9b7f8ac --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; + +public class NullCallback implements RpcCallback<Object> { + private final static NullCallback instance; + + static { + instance = new NullCallback(); + } + + public static RpcCallback get() { + return instance; + } + + @Override + public void run(Object parameter) { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java new file mode 100644 index 0000000..6a340dc --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; + +import com.google.protobuf.MessageLite; + +class ProtoChannelInitializer extends ChannelInitializer<Channel> { + private final MessageLite defaultInstance; + private final ChannelHandler handler; + + public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { + this.handler = handler; + this.defaultInstance = defaultInstance; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); + pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); + pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast("protobufEncoder", new ProtobufEncoder()); + pipeline.addLast("handler", handler); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java new file mode 100644 index 0000000..52ef31a --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteCallException.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import org.apache.tajo.rpc.RpcProtos.RpcResponse; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.io.Writer; + +public class RemoteCallException extends RemoteException { + private int seqId; + private String originExceptionClass; + + public RemoteCallException(int seqId, MethodDescriptor methodDesc, + Throwable t) { + super("Remote call error occurs when " + methodDesc.getFullName() + "is called:", t); + this.seqId = seqId; + if (t != null) { + originExceptionClass = t.getClass().getCanonicalName(); + } + } + + public RemoteCallException(int seqId, Throwable t) { + super(t); + this.seqId = seqId; + if (t != null) { + originExceptionClass = t.getClass().getCanonicalName(); + } + } + + public RpcResponse getResponse() { + RpcResponse.Builder builder = RpcResponse.newBuilder(); + builder.setId(seqId); + if (getCause().getMessage() == null) { + builder.setErrorMessage(getCause().getClass().getName()); + } else { + builder.setErrorMessage(getCause().getMessage()); + } + builder.setErrorTrace(getStackTraceString(getCause())); + builder.setErrorClass(originExceptionClass); + + return builder.build(); + } + + private static String getStackTraceString(Throwable aThrowable) { + final Writer result = new StringWriter(); + final PrintWriter printWriter = new PrintWriter(result); + aThrowable.printStackTrace(printWriter); + return result.toString(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java new file mode 100644 index 0000000..30c110d --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RemoteException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +public class RemoteException extends RuntimeException { + public RemoteException() { + super(); + } + + public RemoteException(String message) { + super(message); + } + + public RemoteException(Throwable t) { + super(t); + } + + public RemoteException(String message, Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java new file mode 100644 index 0000000..3c054ad --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.io.IOException; +import java.util.Date; +import java.util.List; + +public class RetriesExhaustedException extends RuntimeException { + private static final long serialVersionUID = 1876775844L; + + public RetriesExhaustedException(final String msg) { + super(msg); + } + + public RetriesExhaustedException(final String msg, final IOException e) { + super(msg, e); + } + + /** + * Datastructure that allows adding more info around Throwable incident. + */ + public static class ThrowableWithExtraContext { + private final Throwable t; + private final long when; + private final String extras; + + public ThrowableWithExtraContext(final Throwable t, final long when, + final String extras) { + this.t = t; + this.when = when; + this.extras = extras; + } + + @Override + public String toString() { + return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + } + } + + /** + * Create a new RetriesExhaustedException from the list of prior failures. + * @param callableVitals Details from the {@link ServerCallable} we were using + * when we got this exception. + * @param numTries The number of tries we made + * @param exceptions List of exceptions that failed before giving up + */ + public RetriesExhaustedException(final String callableVitals, int numTries, + List<Throwable> exceptions) { + super(getMessage(callableVitals, numTries, exceptions)); + } + + /** + * Create a new RetriesExhaustedException from the list of prior failures. + * @param numTries + * @param exceptions List of exceptions that failed before giving up + */ + public RetriesExhaustedException(final int numTries, + final List<Throwable> exceptions) { + super(getMessage(numTries, exceptions)); + } + + private static String getMessage(String callableVitals, int numTries, + List<Throwable> exceptions) { + StringBuilder buffer = new StringBuilder("Failed contacting "); + buffer.append(callableVitals); + buffer.append(" after "); + buffer.append(numTries + 1); + buffer.append(" attempts.\nExceptions:\n"); + for (Throwable t : exceptions) { + buffer.append(t.toString()); + buffer.append("\n"); + } + return buffer.toString(); + } + + private static String getMessage(final int numTries, + final List<Throwable> exceptions) { + StringBuilder buffer = new StringBuilder("Failed after attempts="); + buffer.append(numTries + 1); + buffer.append(", exceptions:\n"); + for (Throwable t : exceptions) { + buffer.append(t.toString()); + buffer.append("\n"); + } + return buffer.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java new file mode 100644 index 0000000..6d1f479 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import io.netty.channel.ConnectTimeoutException; +import io.netty.util.internal.logging.CommonsLoggerFactory; +import io.netty.util.internal.logging.InternalLoggerFactory; + +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; + +public class RpcConnectionPool { + private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class); + + private Map<RpcConnectionKey, NettyClientBase> connections = + new HashMap<RpcConnectionKey, NettyClientBase>(); + + private static RpcConnectionPool instance; + private final Object lockObject = new Object(); + + public final static int RPC_RETRIES = 3; + + private RpcConnectionPool() { + } + + public synchronized static RpcConnectionPool getPool() { + if(instance == null) { + InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); + instance = new RpcConnectionPool(); + } + return instance; + } + + private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + NettyClientBase client; + if(rpcConnectionKey.asyncMode) { + client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES); + } else { + client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES); + } + return client; + } + + public static final long DEFAULT_TIMEOUT = 3000; + public static final long DEFAULT_INTERVAL = 500; + + public NettyClientBase getConnection(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL); + } + + public NettyClientBase getConnection(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode, long timeout, long interval) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); + + RpcUtils.Timer timer = new RpcUtils.Timer(timeout); + for (; !timer.isTimedOut(); timer.elapsed()) { + NettyClientBase client; + synchronized (lockObject) { + client = connections.get(key); + if (client == null) { + connections.put(key, client = makeConnection(key)); + } + } + if (client.acquire(timer.remaining())) { + return client; + } + timer.interval(interval); + } + + throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec"); + } + + public void releaseConnection(NettyClientBase client) { + release(client, false); + } + + public void closeConnection(NettyClientBase client) { + release(client, true); + } + + private void release(NettyClientBase client, boolean close) { + if (client == null) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Close connection [" + client.getKey() + "]"); + } + try { + if (returnToPool(client, close)) { + client.close(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Current Connections [" + connections.size() + "]"); + } + } catch (Exception e) { + LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); + } + } + + // return true if the connection should be closed + private boolean returnToPool(NettyClientBase client, boolean close) { + synchronized (lockObject) { + if (client.release() && (close || !client.isConnected())) { + connections.remove(client.getKey()); + return true; + } + } + return false; + } + + public void close() { + if(LOG.isDebugEnabled()) { + LOG.debug("Pool Closed"); + } + + synchronized (lockObject) { + for (NettyClientBase eachClient : connections.values()) { + try { + eachClient.close(); + } catch (Exception e) { + LOG.error("close client pool error", e); + } + } + connections.clear(); + } + } + + public void shutdown(){ + close(); + RpcChannelFactory.shutdownGracefully(); + } + + static class RpcConnectionKey { + final InetSocketAddress addr; + final Class<?> protocolClass; + final boolean asyncMode; + + final String description; + + public RpcConnectionKey(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) { + this.addr = addr; + this.protocolClass = protocolClass; + this.asyncMode = asyncMode; + this.description = "["+ protocolClass + "] " + addr + "," + asyncMode; + } + + @Override + public String toString() { + return description; + } + + @Override + public boolean equals(Object obj) { + if(!(obj instanceof RpcConnectionKey)) { + return false; + } + + return toString().equals(obj.toString()); + } + + @Override + public int hashCode() { + return description.hashCode(); + } + } +}
