Repository: tajo Updated Branches: refs/heads/index_support 2fe3db4c5 -> e6b91c333
http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 57e436b..190beae 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -18,40 +18,49 @@ package org.apache.tajo.rpc; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; +import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.io.Closeable; import java.lang.reflect.Method; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; -public abstract class NettyClientBase implements Closeable { - private static final Log LOG = LogFactory.getLog(NettyClientBase.class); - private static final int CONNECTION_TIMEOUT = 60000; // 60 sec - private static final long PAUSE = 1000; // 1 sec - - private final int numRetries; +public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable { + public final static Log LOG = LogFactory.getLog(NettyClientBase.class); private Bootstrap bootstrap; private volatile ChannelFuture channelFuture; - - protected final Class<?> protocol; - protected final AtomicInteger sequence = new AtomicInteger(0); - private final RpcConnectionKey key; + private final int maxRetries; + private boolean enableMonitor; + + private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners = + new ConcurrentHashMap<RpcConnectionKey, ChannelEventListener>(); + private final ConcurrentMap<Integer, T> requests = new ConcurrentHashMap<Integer, T>(); public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) throws ClassNotFoundException, NoSuchMethodException { this.key = rpcConnectionKey; - this.protocol = rpcConnectionKey.protocolClass; - this.numRetries = numRetries; + this.maxRetries = numRetries; } // should be called from sub class @@ -59,13 +68,13 @@ public abstract class NettyClientBase implements Closeable { this.bootstrap = new Bootstrap(); this.bootstrap .group(RpcChannelFactory.getSharedClientEventloopGroup()) - .channel(NioSocketChannel.class) - .handler(initializer) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT) - .option(ChannelOption.SO_RCVBUF, 1048576 * 10) - .option(ChannelOption.TCP_NODELAY, true); + .channel(NioSocketChannel.class) + .handler(initializer) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, RpcConstants.DEFAULT_CONNECT_TIMEOUT) + .option(ChannelOption.SO_RCVBUF, 1048576 * 10) + .option(ChannelOption.TCP_NODELAY, true); } public RpcClientManager.RpcConnectionKey getKey() { @@ -73,23 +82,81 @@ public abstract class NettyClientBase implements Closeable { } protected final Class<?> getServiceClass() throws ClassNotFoundException { - String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; + String serviceClassName = getKey().protocolClass.getName() + "$" + + getKey().protocolClass.getSimpleName() + "Service"; return Class.forName(serviceClassName); } @SuppressWarnings("unchecked") - protected final <T> T getStub(Method stubMethod, Object rpcChannel) { + protected final <I> I getStub(Method stubMethod, Object rpcChannel) { try { - return (T) stubMethod.invoke(null, rpcChannel); + return (I) stubMethod.invoke(null, rpcChannel); } catch (Exception e) { throw new RemoteException(e.getMessage(), e); } } - public abstract <T> T getStub(); + protected static RpcProtos.RpcRequest buildRequest(int seqId, + Descriptors.MethodDescriptor method, + Message param) { + RpcProtos.RpcRequest.Builder requestBuilder = RpcProtos.RpcRequest.newBuilder() + .setId(seqId) + .setMethodName(method.getName()); + + if (param != null) { + requestBuilder.setRequestMessage(param.toByteString()); + } + + return requestBuilder.build(); + } + + /** + * Repeat invoke rpc request until the connection attempt succeeds or exceeded retries + */ + protected void invoke(final RpcProtos.RpcRequest rpcRequest, final T callback, final int retry) { + + ChannelPromise promise = getChannel().newPromise(); + promise.addListener(new GenericFutureListener<ChannelFuture>() { + @Override + public void operationComplete(final ChannelFuture future) throws Exception { - private InetSocketAddress resolveAddress(InetSocketAddress address) { + if (future.isSuccess()) { + + getHandler().registerCallback(rpcRequest.getId(), callback); + } else { + + if (!future.channel().isActive() && retry < maxRetries) { + + /* schedule the current request for the retry */ + LOG.warn(future.cause() + " Try to reconnect :" + getKey().addr); + + final EventLoop loop = future.channel().eventLoop(); + loop.schedule(new Runnable() { + @Override + public void run() { + doConnect(getKey().addr).addListener(new GenericFutureListener<ChannelFuture>() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + invoke(rpcRequest, callback, retry + 1); + } + }); + } + }, RpcConstants.DEFAULT_PAUSE, TimeUnit.MILLISECONDS); + } else { + + /* Max retry count has been exceeded or internal failure */ + getHandler().registerCallback(rpcRequest.getId(), callback); + getHandler().exceptionCaught(getChannel().pipeline().lastContext(), + new RecoverableException(rpcRequest.getId(), future.cause())); + } + } + } + }); + getChannel().writeAndFlush(rpcRequest, promise); + } + + private static InetSocketAddress resolveAddress(InetSocketAddress address) { if (address.isUnresolved()) { return RpcUtils.createSocketAddr(address.getHostName(), address.getPort()); } @@ -100,11 +167,10 @@ public abstract class NettyClientBase implements Closeable { return this.channelFuture = bootstrap.clone().connect(address); } - - public synchronized void connect() throws ConnectTimeoutException { + public synchronized void connect() throws ConnectException { if (isConnected()) return; - final AtomicInteger retries = new AtomicInteger(); + int retries = 0; InetSocketAddress address = key.addr; if (address.isUnresolved()) { address = resolveAddress(address); @@ -112,22 +178,26 @@ public abstract class NettyClientBase implements Closeable { /* do not call await() inside handler */ ChannelFuture f = doConnect(address).awaitUninterruptibly(); - retries.incrementAndGet(); - if (!f.isSuccess() && numRetries > 0) { - doReconnect(address, f, retries); + if (!f.isSuccess()) { + if (maxRetries > 0) { + doReconnect(address, f, ++retries); + } else { + throw new ConnectException(ExceptionUtils.getMessage(f.cause())); + } } } - private void doReconnect(final InetSocketAddress address, ChannelFuture future, AtomicInteger retries) - throws ConnectTimeoutException { + private void doReconnect(final InetSocketAddress address, ChannelFuture future, int retries) + throws ConnectException { for (; ; ) { - if (numRetries >= retries.getAndIncrement()) { + if (maxRetries > retries) { + retries++; - LOG.warn(future.cause().getMessage() + " Try to reconnect"); + LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr); try { - Thread.sleep(PAUSE); + Thread.sleep(RpcConstants.DEFAULT_PAUSE); } catch (InterruptedException e) { } @@ -136,19 +206,21 @@ public abstract class NettyClientBase implements Closeable { break; } } else { - throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + numRetries + throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + retries + " caused by: " + future.cause()); } } } + protected abstract NettyChannelInboundHandler getHandler(); + public Channel getChannel() { return channelFuture == null ? null : channelFuture.channel(); } public boolean isConnected() { Channel channel = getChannel(); - return channel != null && channel.isOpen() && channel.isActive(); + return channel != null && channel.isActive(); } public SocketAddress getRemoteAddress() { @@ -156,12 +228,165 @@ public abstract class NettyClientBase implements Closeable { return channel == null ? null : channel.remoteAddress(); } + public int getActiveRequests() { + return requests.size(); + } + + public boolean subscribeEvent(RpcConnectionKey key, ChannelEventListener listener) { + return channelEventListeners.putIfAbsent(key, listener) == null; + } + + public void removeSubscribers() { + channelEventListeners.clear(); + } + + public Collection<ChannelEventListener> getSubscribers() { + return channelEventListeners.values(); + } + + private String getErrorMessage(String message) { + return "Exception [" + getKey().protocolClass.getCanonicalName() + + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) + getChannel().remoteAddress()) + ")]: " + message; + } + @Override public void close() { + getHandler().sendExceptions(getClass().getSimpleName() + "terminates all the connections"); + Channel channel = getChannel(); if (channel != null && channel.isOpen()) { LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); - channel.close().awaitUninterruptibly(); + channel.close().syncUninterruptibly(); + } + } + + protected abstract class NettyChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> { + + protected void registerCallback(int seqId, T callback) { + if (requests.putIfAbsent(seqId, callback) != null) { + throw new RemoteException( + getErrorMessage("Duplicate Sequence Id " + seqId)); + } + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + MonitorClientHandler handler = ctx.pipeline().get(MonitorClientHandler.class); + if (handler != null) { + enableMonitor = true; + } + + for (ChannelEventListener listener : getSubscribers()) { + listener.channelRegistered(ctx); + } + super.channelRegistered(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + for (ChannelEventListener listener : getSubscribers()) { + listener.channelUnregistered(ctx); + } + super.channelUnregistered(ctx); + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.debug("Connection established successfully : " + ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + sendExceptions("Connection lost :" + getKey().addr); + } + + @Override + protected final void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { + T callback = requests.remove(response.getId()); + if (callback == null) + LOG.warn("Dangling rpc call"); + else run(response, callback); + } + + /** + * A {@link #channelRead0} received a message. + * @param response response proto of type {@link RpcResponse}. + * @param callback callback of type {@link T}. + * @throws Exception + */ + protected abstract void run(RpcResponse response, T callback) throws Exception; + + /** + * Calls from exceptionCaught + * @param requestId sequence id of request. + * @param callback callback of type {@link T}. + * @param message the error message to handle + */ + protected abstract void handleException(int requestId, T callback, String message); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + + Throwable rootCause = ExceptionUtils.getRootCause(cause); + LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause), rootCause); + + if (cause instanceof RecoverableException) { + sendException((RecoverableException) cause); + } else { + /* unrecoverable fatal error*/ + sendExceptions(ExceptionUtils.getMessage(rootCause)); + if (ctx.channel().isOpen()) { + ctx.close(); + } + } + } + + /** + * Send an error to all callback + */ + private void sendExceptions(String message) { + for (int requestId : requests.keySet()) { + handleException(requestId, requests.remove(requestId), message); + } + } + + /** + * Send an error to callback + */ + private void sendException(RecoverableException e) { + T callback = requests.remove(e.getSeqId()); + + if (callback != null) { + handleException(e.getSeqId(), callback, ExceptionUtils.getRootCauseMessage(e)); + } + } + + /** + * Trigger timeout event + */ + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + + if (!enableMonitor && evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + /* If all requests is done and event is triggered, idle channel close. */ + if (e.state() == IdleState.READER_IDLE && requests.isEmpty()) { + ctx.close(); + LOG.info("Idle connection closed successfully :" + ctx.channel()); + } + } else if (evt instanceof MonitorStateEvent) { + MonitorStateEvent e = (MonitorStateEvent) evt; + if (e.state() == MonitorStateEvent.MonitorState.PING_EXPIRED) { + exceptionCaught(ctx, new ServiceException("Server has not respond: " + ctx.channel())); + } + } + + super.userEventTriggered(ctx, evt); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java deleted file mode 100644 index 74eb650..0000000 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.MessageLite; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufEncoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; -import io.netty.handler.timeout.IdleStateHandler; - -class ProtoChannelInitializer extends ChannelInitializer<Channel> { - private final MessageLite defaultInstance; - private final ChannelHandler handler; - private final int idleTimeSeconds; - - public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { - this(handler, defaultInstance, 0); - } - - public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, int idleTimeSeconds) { - this.handler = handler; - this.defaultInstance = defaultInstance; - this.idleTimeSeconds = idleTimeSeconds; - } - - @Override - protected void initChannel(Channel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); - pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); - pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); - pipeline.addLast("protobufEncoder", new ProtobufEncoder()); - pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeSeconds)); //zero is disabling - pipeline.addLast("handler", handler); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java new file mode 100644 index 0000000..8787dee --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.MessageLite; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.timeout.IdleStateHandler; + +import java.util.concurrent.TimeUnit; + +class ProtoClientChannelInitializer extends ChannelInitializer<Channel> { + private final MessageLite defaultInstance; + private final ChannelHandler handler; + private final long timeoutTimeNanos; + private final boolean enablePing; + + public ProtoClientChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, + long timeoutTimeNanos, + boolean enablePing) { + this.handler = handler; + this.defaultInstance = defaultInstance; + this.timeoutTimeNanos = timeoutTimeNanos; + this.enablePing = enablePing; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("idleStateHandler", + new IdleStateHandler(timeoutTimeNanos, timeoutTimeNanos / 2, 0, TimeUnit.NANOSECONDS)); + + if (enablePing) pipeline.addLast("MonitorClientHandler", new MonitorClientHandler()); + + pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); + pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); + pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast("protobufEncoder", new ProtobufEncoder()); + pipeline.addLast("handler", handler); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java new file mode 100644 index 0000000..91d331e --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoDeclaration.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +interface ProtoDeclaration { + + <I> I getStub(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java new file mode 100644 index 0000000..3e2fe0d --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoServerChannelInitializer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.MessageLite; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; + +class ProtoServerChannelInitializer extends ChannelInitializer<Channel> { + private final MessageLite defaultInstance; + private final ChannelHandler handler; + + public ProtoServerChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { + this.handler = handler; + this.defaultInstance = defaultInstance; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("MonitorServerHandler", new MonitorServerHandler()); + pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); + pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); + pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast("protobufEncoder", new ProtobufEncoder()); + pipeline.addLast("handler", handler); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java new file mode 100644 index 0000000..1d22663 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RecoverableException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +/* RecoverableException can be handle a failure request */ +public class RecoverableException extends RemoteException { + private int seqId; + + public RecoverableException(int seqId, String message) { + super(message); + this.seqId = seqId; + } + + public RecoverableException(int seqId, Throwable t) { + this(seqId, t.getMessage(), t); + } + + public RecoverableException(int seqId, String message, Throwable t) { + super(message, t); + this.seqId = seqId; + } + + public int getSeqId() { + return seqId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index f05fb97..f8def7f 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -18,26 +18,28 @@ package org.apache.tajo.rpc; -import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.internal.logging.CommonsLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import javax.annotation.concurrent.ThreadSafe; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; @ThreadSafe public class RpcClientManager { private static final Log LOG = LogFactory.getLog(RpcClientManager.class); - public static final int RPC_RETRIES = 3; - - /* If all requests is done and client is idle state, client will be removed. */ - public static final int RPC_IDLE_TIMEOUT = 43200; // 12 hour + private volatile int timeoutSeconds = RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS; + private volatile int retries = RpcConstants.DEFAULT_RPC_RETRIES; /* entries will be removed by ConnectionCloseFutureListener */ private static final Map<RpcConnectionKey, NettyClientBase> @@ -57,45 +59,86 @@ public class RpcClientManager { return instance; } - private NettyClientBase makeClient(RpcConnectionKey rpcConnectionKey) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey, + int retries, + long timeout, + TimeUnit timeUnit, + boolean enablePing) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { NettyClientBase client; if (rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT); + client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing); } else { - client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT); + client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing); } - return client; + return (T) client; } /** * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol. * This client will be shared per protocol and address. Client is removed in shared map when a client is closed - * @param addr - * @param protocolClass - * @param asyncMode - * @return - * @throws NoSuchMethodException - * @throws ClassNotFoundException - * @throws ConnectTimeoutException */ - public NettyClientBase getClient(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + public <T extends NettyClientBase> T getClient(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); NettyClientBase client; synchronized (clients) { client = clients.get(key); if (client == null) { - clients.put(key, client = makeClient(key)); + clients.put(key, client = makeClient(key, retries, getTimeoutSeconds(), TimeUnit.SECONDS, true)); } } if (!client.isConnected()) { - client.connect(); - client.getChannel().closeFuture().addListener(new ConnectionCloseFutureListener(key)); + + final NettyClientBase target = client; + client.subscribeEvent(target.getKey(), new ChannelEventListener() { + @Override + public void channelRegistered(ChannelHandlerContext ctx) { + /* Register client to managed map */ + clients.put(target.getKey(), target); + target.getChannel().closeFuture().addListener(new ClientCloseFutureListener(target.getKey())); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + // nothing to do + } + }); } + + client.connect(); + assert client.isConnected(); + return (T) client; + } + + /** + * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol. + * This client does not managed. It should close. + */ + public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr, + Class<?> protocolClass, + boolean asyncMode, + int retries, + long timeout, + TimeUnit timeUnit, + boolean enablePing) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { + + return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing); + } + + public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key, + int retries, + long timeout, + TimeUnit timeUnit, + boolean enablePing) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { + + T client = makeClient(key, retries, timeout, timeUnit, enablePing); + client.connect(); assert client.isConnected(); return client; } @@ -125,11 +168,6 @@ public class RpcClientManager { RpcChannelFactory.shutdownGracefully(); } - protected static NettyClientBase remove(RpcConnectionKey key) { - LOG.debug("Removing shared rpc client :" + key); - return clients.remove(key); - } - protected static boolean contains(RpcConnectionKey key) { return clients.containsKey(key); } @@ -148,6 +186,22 @@ public class RpcClientManager { } } + public int getTimeoutSeconds() { + return timeoutSeconds; + } + + public void setTimeoutSeconds(int timeoutSeconds) { + this.timeoutSeconds = timeoutSeconds; + } + + public int getRetries() { + return retries; + } + + public void setRetries(int retries) { + this.retries = retries; + } + static class RpcConnectionKey { final InetSocketAddress addr; final Class<?> protocolClass; @@ -182,4 +236,17 @@ public class RpcClientManager { return description.hashCode(); } } + + static class ClientCloseFutureListener implements GenericFutureListener { + private RpcClientManager.RpcConnectionKey key; + + public ClientCloseFutureListener(RpcClientManager.RpcConnectionKey key) { + this.key = key; + } + + @Override + public void operationComplete(Future future) throws Exception { + clients.remove(key); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto index 824ea38..35cc945 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/proto/TestProtocol.proto @@ -28,6 +28,7 @@ service DummyProtocolService { rpc echo (EchoMessage) returns (EchoMessage); rpc getError (EchoMessage) returns (EchoMessage); rpc getNull (EchoMessage) returns (EchoMessage); - rpc deley (EchoMessage) returns (EchoMessage); + rpc delay (EchoMessage) returns (EchoMessage); + rpc busy (EchoMessage) returns (EchoMessage); rpc throwException (EchoMessage) returns (EchoMessage); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 68f170c..f8c6d32 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -39,9 +39,11 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -59,17 +61,20 @@ public class TestAsyncRpc { Interface stub; DummyProtocolAsyncImpl service; int retries; - + RpcClientManager.RpcConnectionKey rpcConnectionKey; + RpcClientManager manager = RpcClientManager.getInstance(); + @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @interface SetupRpcConnection { boolean setupRpcServer() default true; + boolean setupRpcClient() default true; } - + @Rule public ExternalResource resource = new ExternalResource() { - + private Description description; @Override @@ -85,7 +90,7 @@ public class TestAsyncRpc { if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { setUpRpcServer(); } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { setUpRpcClient(); } @@ -102,7 +107,7 @@ public class TestAsyncRpc { fail(e.getMessage()); } } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { try { tearDownRpcServer(); @@ -111,25 +116,24 @@ public class TestAsyncRpc { } } } - + }; - + public void setUpRpcServer() throws Exception { service = new DummyProtocolAsyncImpl(); server = new AsyncRpcServer(DummyProtocol.class, - service, new InetSocketAddress("127.0.0.1", 0), 2); + service, new InetSocketAddress("127.0.0.1", 0), 3); server.start(); } - + public void setUpRpcClient() throws Exception { retries = 1; - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( - RpcUtils.getConnectAddress(server.getListenAddress()), - DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); - client.connect(); + rpcConnectionKey = new RpcClientManager.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, true); + client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true); + assertTrue(client.isConnected()); stub = client.getStub(); } @@ -137,16 +141,16 @@ public class TestAsyncRpc { public static void tearDownClass() throws Exception { RpcChannelFactory.shutdownGracefully(); } - + public void tearDownRpcServer() throws Exception { - if(server != null) { + if (server != null) { server.shutdown(); server = null; } } - + public void tearDownRpcClient() throws Exception { - if(client != null) { + if (client != null) { client.close(); client = null; } @@ -172,35 +176,37 @@ public class TestAsyncRpc { }); + final CountDownLatch barrier = new CountDownLatch(1); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() { @Override public void run(EchoMessage parameter) { + assertNotNull(parameter); echo = parameter.getMessage(); assertEquals(MESSAGE, echo); calledMarker = true; + barrier.countDown(); } }; stub.echo(null, echoMessage, callback); - Thread.sleep(1000); + + assertTrue(barrier.await(1000, TimeUnit.MILLISECONDS)); assertTrue(calledMarker); } - private CountDownLatch testNullLatch; - @Test public void testGetNull() throws Exception { - testNullLatch = new CountDownLatch(1); + final CountDownLatch barrier = new CountDownLatch(1); stub.getNull(null, null, new RpcCallback<EchoMessage>() { @Override public void run(EchoMessage parameter) { assertNull(parameter); LOG.info("testGetNull retrieved"); - testNullLatch.countDown(); + barrier.countDown(); } }); - assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(barrier.await(1000, TimeUnit.MILLISECONDS)); assertTrue(service.getNullCalled); } @@ -209,70 +215,112 @@ public class TestAsyncRpc { EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); + stub.delay(future.getController(), echoMessage, future); assertFalse(future.isDone()); - assertEquals(future.get(), echoMessage); + assertEquals(echoMessage, future.get()); assertTrue(future.isDone()); } @Test public void testCallFutureTimeout() throws Exception { boolean timeout = false; + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + EchoMessage echoMessage = EchoMessage.newBuilder().setMessage(MESSAGE).build(); try { - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); - + stub.delay(future.getController(), echoMessage, future); assertFalse(future.isDone()); - future.get(1, TimeUnit.SECONDS); + future.get(100, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { timeout = true; } + assertFalse(future.getController().failed()); assertTrue(timeout); } @Test - public void testCallFutureDisconnected() throws Exception { + public void testThrowException() throws Exception { EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.throwException(future.getController(), echoMessage, future); - tearDownRpcServer(); - - stub.echo(future.getController(), echoMessage, future); - EchoMessage response = future.get(); + assertFalse(future.isDone()); + EchoMessage result = null; + try { + result = future.get(); + } catch (ExecutionException e) { + } - assertNull(response); + assertEquals(null, result); + assertTrue(future.isDone()); assertTrue(future.getController().failed()); - assertTrue(future.getController().errorText() != null); + assertNotNull(future.getController().errorText()); } @Test - public void testStubDisconnected() throws Exception { + public void testThrowException2() throws Exception { + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.throwException(null, echoMessage, future); + + assertFalse(future.isDone()); + assertNull(future.get()); + + assertTrue(future.isDone()); + assertFalse(future.getController().failed()); + assertNull(future.getController().errorText()); + } + @Test(timeout = 60000) + public void testServerShutdown1() throws Exception { EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - if (server != null) { - server.shutdown(true); - server = null; + tearDownRpcServer(); + + stub.echo(future.getController(), echoMessage, future); + + EchoMessage result = null; + try { + result = future.get(); + } catch (ExecutionException e) { } - stub = client.getStub(); + assertEquals(null, result); + assertTrue(future.isDone()); + assertTrue(future.getController().failed()); + assertNotNull(future.getController().errorText(), future.getController().errorText()); + } + + @Test(timeout = 60000) + public void testServerShutdown2() throws Exception { + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + tearDownRpcServer(); + + Interface stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); - EchoMessage response = future.get(); - assertNull(response); + EchoMessage result = null; + try { + result = future.get(); + } catch (ExecutionException e) { + } + + assertEquals(null, result); + assertTrue(future.isDone()); assertTrue(future.getController().failed()); - assertTrue(future.getController().errorText() != null); + assertNotNull(future.getController().errorText(), future.getController().errorText()); } @Test - @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) - public void testConnectionRetry() throws Exception { + @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false) + public void testClientRetryOnStartup() throws Exception { retries = 10; ServerSocket serverSocket = new ServerSocket(0); final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); @@ -300,94 +348,228 @@ public class TestAsyncRpc { serverThread.start(); RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); - client.connect(); + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); + AsyncRpcClient client = manager.newClient(rpcConnectionKey, + retries, 0, TimeUnit.MILLISECONDS, false); assertTrue(client.isConnected()); - stub = client.getStub(); + + Interface stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); assertFalse(future.isDone()); assertEquals(echoMessage, future.get()); assertTrue(future.isDone()); + client.close(); + server.shutdown(); } - @Test - public void testConnectionFailure() throws Exception { + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false) + public void testClientRetryFailureOnStartup() throws Exception { + retries = 2; + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); + service = new DummyProtocolAsyncImpl(); + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); + AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries); + try { + client.connect(); + fail(); + } catch (ConnectTimeoutException e) { + assertFalse(e.getMessage(), client.isConnected()); + } + + stub = client.getStub(); + stub.echo(future.getController(), echoMessage, future); + + EchoMessage result = null; + try { + result = future.get(); + } catch (ExecutionException e) { + } + + assertEquals(null, result); + assertTrue(future.isDone()); + assertTrue(future.getController().failed()); + assertNotNull(future.getController().errorText(), future.getController().errorText()); + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false) + public void testUnresolvedAddress() throws Exception { InetSocketAddress address = new InetSocketAddress("test", 0); boolean expected = false; + AsyncRpcClient client = null; try { RpcClientManager.RpcConnectionKey rpcConnectionKey = new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); - NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries); + client = new AsyncRpcClient(rpcConnectionKey, retries); client.connect(); fail(); - } catch (ConnectTimeoutException e) { + } catch (ConnectException e) { expected = true; } catch (Throwable throwable) { - fail(); + fail(throwable.getMessage()); + } finally { + client.close(); } assertTrue(expected); } - @Test - @SetupRpcConnection(setupRpcClient=false) - public void testUnresolvedAddress() throws Exception { + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testUnresolvedAddress2() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( - RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); + new RpcClientManager.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); + AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries); client.connect(); - assertTrue(client.isConnected()); - Interface stub = client.getStub(); + try { + assertTrue(client.isConnected()); + Interface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.echo(future.getController(), echoMessage, future); + + assertFalse(future.isDone()); + assertEquals(echoMessage, future.get()); + assertTrue(future.isDone()); + } finally { + client.close(); + } + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testStubRecovery() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + AsyncRpcClient client = manager.newClient(rpcConnectionKey, 2, 0, TimeUnit.MILLISECONDS, false); + EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); + int repeat = 5; - assertFalse(future.isDone()); - assertEquals(future.get(), echoMessage); - assertTrue(future.isDone()); + assertTrue(client.isConnected()); + Interface stub = client.getStub(); + + client.close(); // close connection + assertFalse(client.isConnected()); + + for (int i = 0; i < repeat; i++) { + try { + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.echo(future.getController(), echoMessage, future); + assertEquals(echoMessage, future.get()); + assertTrue(future.isDone()); + assertTrue(client.isConnected()); + } finally { + client.close(); // close connection + assertFalse(client.isConnected()); + } + } } - @Test + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) public void testIdleTimeout() throws Exception { RpcClientManager.RpcConnectionKey rpcConnectionKey = new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); - AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout - client.connect(); + //500 millis idle timeout + AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); assertTrue(client.isConnected()); - Thread.sleep(2000); + Thread.sleep(600); //timeout assertFalse(client.isConnected()); client.connect(); // try to reconnect assertTrue(client.isConnected()); - client.close(); + + Thread.sleep(600); //timeout assertFalse(client.isConnected()); + client.close(); } - @Test - public void testIdleTimeoutWithActiveRequest() throws Exception { + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testPingOnIdle() throws Exception { RpcClientManager.RpcConnectionKey rpcConnectionKey = new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); - AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout - client.connect(); + //500 millis request timeout + AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + assertTrue(client.isConnected()); + + Thread.sleep(600); + assertTrue(client.isConnected()); + + Thread.sleep(600); + assertTrue(client.isConnected()); + client.close(); + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testIdleTimeoutWithActiveRequest() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + //500 millis idle timeout + AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); assertTrue(client.isConnected()); + Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); //3 sec delay + stub.delay(future.getController(), echoMessage, future); //3 sec delay + assertTrue(client.isConnected()); assertFalse(future.isDone()); - assertEquals(future.get(), echoMessage); + assertEquals(echoMessage, future.get()); assertTrue(future.isDone()); - Thread.sleep(2000); + assertTrue(client.getActiveRequests() == 0); + Thread.sleep(600); //timeout assertFalse(client.isConnected()); } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testRequestTimeoutOnBusy() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + + //500 millis request timeout + AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + assertTrue(client.isConnected()); + + Interface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.busy(future.getController(), echoMessage, future); //30 sec delay + assertFalse(future.isDone()); + + EchoMessage result = null; + try { + result = future.get(); + } catch (ExecutionException e) { + } + + assertEquals(null, result); + assertTrue(future.getController().errorText(), future.getController().failed()); + assertTrue(client.getActiveRequests() == 0); + client.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 568eb63..6f7fdd1 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -36,6 +36,7 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; @@ -50,18 +51,20 @@ public class TestBlockingRpc { private BlockingRpcClient client; private BlockingInterface stub; private DummyProtocolBlockingImpl service; + RpcClientManager manager = RpcClientManager.getInstance(); private int retries; - + @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @interface SetupRpcConnection { boolean setupRpcServer() default true; + boolean setupRpcClient() default true; } - + @Rule public ExternalResource resource = new ExternalResource() { - + private Description description; @Override @@ -73,11 +76,11 @@ public class TestBlockingRpc { @Override protected void before() throws Throwable { SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { setUpRpcServer(); } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { setUpRpcClient(); } @@ -86,7 +89,7 @@ public class TestBlockingRpc { @Override protected void after() { SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { try { tearDownRpcClient(); @@ -94,7 +97,7 @@ public class TestBlockingRpc { fail(e.getMessage()); } } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { try { tearDownRpcServer(); @@ -103,25 +106,24 @@ public class TestBlockingRpc { } } } - + }; - + public void setUpRpcServer() throws Exception { service = new DummyProtocolBlockingImpl(); server = new BlockingRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); } - + public void setUpRpcClient() throws Exception { retries = 1; RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( - RpcUtils.getConnectAddress(server.getListenAddress()), - DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - client.connect(); + new RpcClientManager.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, false); + client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true); assertTrue(client.isConnected()); stub = client.getStub(); } @@ -130,16 +132,16 @@ public class TestBlockingRpc { public static void tearDownClass() throws Exception { RpcChannelFactory.shutdownGracefully(); } - + public void tearDownRpcServer() throws Exception { - if(server != null) { + if (server != null) { server.shutdown(); server = null; } } - + public void tearDownRpcClient() throws Exception { - if(client != null) { + if (client != null) { client.close(); client = null; } @@ -162,7 +164,14 @@ public class TestBlockingRpc { } @Test - @SetupRpcConnection(setupRpcClient=false) + public void testGetNull() throws Exception { + assertNull(stub.getNull(null, null)); + assertTrue(service.getNullCalled); + } + + @Test + @SetupRpcConnection(setupRpcClient = false) + @Deprecated // serverCallable will be remove public void testRpcWithServiceCallable() throws Exception { RpcClientManager manager = RpcClientManager.getInstance(); final SumRequest request = SumRequest.newBuilder() @@ -172,15 +181,15 @@ public class TestBlockingRpc { .setX4(2.0f).build(); SumResponse response = - new ServerCallable<SumResponse>(manager, - server.getListenAddress(), DummyProtocol.class, false) { - @Override - public SumResponse call(NettyClientBase client) throws Exception { - BlockingInterface stub2 = client.getStub(); - SumResponse response1 = stub2.sum(null, request); - return response1; - } - }.withRetries(); + new ServerCallable<SumResponse>(manager, + server.getListenAddress(), DummyProtocol.class, false) { + @Override + public SumResponse call(NettyClientBase client) throws Exception { + BlockingInterface stub2 = client.getStub(); + SumResponse response1 = stub2.sum(null, request); + return response1; + } + }.withRetries(); assertEquals(8.15d, response.getResult(), 1e-15); @@ -207,10 +216,8 @@ public class TestBlockingRpc { try { stub.throwException(null, message); fail("RpcCall should throw exception"); - } catch (Throwable t) { - assertTrue(t instanceof TajoServiceException); - assertEquals("Exception Test", t.getMessage()); - TajoServiceException te = (TajoServiceException)t; + } catch (TajoServiceException te) { + assertEquals("Exception Test", te.getMessage()); assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol()); assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(), te.getRemoteAddress()); @@ -222,83 +229,57 @@ public class TestBlockingRpc { EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); + DefaultRpcController controller = new DefaultRpcController(); try { - stub.throwException(null, message); + stub.throwException(controller, message); fail("RpcCall should throw exception"); - } catch (Throwable t) { - assertTrue(t instanceof TajoServiceException); + } catch (TajoServiceException t) { + assertTrue(controller.failed()); + assertNotNull(controller.errorText()); } - EchoMessage message1 = stub.deley(null, message); + controller.reset(); + EchoMessage message1 = stub.delay(controller, message); assertEquals(message, message1); + assertFalse(controller.failed()); + assertNull(controller.errorText()); } - @Test - @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) - public void testConnectionRetry() throws Exception { - retries = 10; - ServerSocket serverSocket = new ServerSocket(0); - final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); - serverSocket.close(); - + @Test(timeout = 60000) + public void testServerShutdown1() throws Exception { EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); - //lazy startup - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(1000); - server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2); - } catch (Exception e) { - fail(e.getMessage()); - } - server.start(); - } - }); - serverThread.start(); - - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - client.connect(); - assertTrue(client.isConnected()); - stub = client.getStub(); - - EchoMessage response = stub.echo(null, message); - assertEquals(MESSAGE, response.getMessage()); - } - - @Test - public void testConnectionFailed() throws Exception { - NettyClientBase client = null; - boolean expected = false; + tearDownRpcServer(); + boolean expect = false; try { - int port = server.getListenAddress().getPort() + 1; - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( - RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), - DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - client.connect(); - fail(); - } catch (ConnectTimeoutException e) { - expected = true; - } catch (Throwable e) { - fail(e.getMessage()); + EchoMessage response = stub.echo(null, message); + fail(response.getMessage()); + } catch (TajoServiceException e) { + expect = true; } - assertTrue(expected); + assertTrue(expect); } - @Test - public void testGetNull() throws Exception { - assertNull(stub.getNull(null, null)); - assertTrue(service.getNullCalled); + @Test(timeout = 60000) + public void testServerShutdown2() throws Exception { + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + tearDownRpcServer(); + boolean expect = false; + try { + BlockingInterface stub = client.getStub(); + EchoMessage response = stub.echo(null, message); + fail(response.getMessage()); + } catch (TajoServiceException e) { + expect = true; + } + assertTrue(expect); } @Test - public void testShutdown() throws Exception { + public void testServerShutdown3() throws Exception { final StringBuilder error = new StringBuilder(); Thread callThread = new Thread() { public void run() { @@ -306,11 +287,11 @@ public class TestBlockingRpc { EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE) .build(); - stub.deley(null, message); + stub.delay(null, message); } catch (Exception e) { error.append(e.getMessage()); } - synchronized(error) { + synchronized (error) { error.notifyAll(); } } @@ -340,67 +321,244 @@ public class TestBlockingRpc { assertTrue(latch.getCount() == 0); - synchronized(error) { + synchronized (error) { error.wait(5 * 1000); } - if(!error.toString().isEmpty()) { + if (!error.toString().isEmpty()) { fail(error.toString()); } } - @Test - @SetupRpcConnection(setupRpcClient=false) - public void testUnresolvedAddress() throws Exception { - String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false) + public void testClientRetryOnStartup() throws Exception { + retries = 10; + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); + + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + //lazy startup + Thread serverThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2); + } catch (Exception e) { + fail(e.getMessage()); + } + server.start(); + } + }); + serverThread.start(); + RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( - RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - client.connect(); + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); + + BlockingRpcClient client = manager.newClient(rpcConnectionKey, + retries, 0, TimeUnit.MILLISECONDS, false); assertTrue(client.isConnected()); + BlockingInterface stub = client.getStub(); + EchoMessage response = stub.echo(null, message); + assertEquals(MESSAGE, response.getMessage()); + client.close(); + server.shutdown(); + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false) + public void testClientRetryFailureOnStartup() throws Exception { + retries = 2; + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); - EchoMessage response2 = stub.echo(null, message); - assertEquals(MESSAGE, response2.getMessage()); + + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); + BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries); + + try { + client.connect(); + fail(); + } catch (ConnectTimeoutException e) { + assertFalse(e.getMessage(), client.isConnected()); + } + + BlockingInterface stub = client.getStub(); + try { + EchoMessage response = stub.echo(null, message); + fail(); + } catch (TajoServiceException e) { + assertFalse(e.getMessage(), client.isConnected()); + } + RpcClientManager.cleanup(client); + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcServer = false, setupRpcClient = false) + public void testUnresolvedAddress() throws Exception { + InetSocketAddress address = new InetSocketAddress("test", 0); + boolean expected = false; + BlockingRpcClient client = null; + try { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); + client = new BlockingRpcClient(rpcConnectionKey, retries); + client.connect(); + fail(); + } catch (ConnectException e) { + expected = true; + } catch (Throwable throwable) { + fail(throwable.getMessage()); + } finally { + client.close(); + } + assertTrue(expected); } @Test + @SetupRpcConnection(setupRpcClient = false) + public void testUnresolvedAddress2() throws Exception { + String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); + BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries); + client.connect(); + assertTrue(client.isConnected()); + + try { + BlockingInterface stub = client.getStub(); + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + EchoMessage response2 = stub.echo(null, message); + assertEquals(MESSAGE, response2.getMessage()); + } finally { + client.close(); + } + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testStubRecovery() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + BlockingRpcClient client = manager.newClient(rpcConnectionKey, 1, 0, TimeUnit.MILLISECONDS, false); + + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + int repeat = 5; + + assertTrue(client.isConnected()); + BlockingInterface stub = client.getStub(); + + client.close(); // close connection + assertFalse(client.isConnected()); + + for (int i = 0; i < repeat; i++) { + try { + EchoMessage response = stub.echo(null, echoMessage); + assertEquals(MESSAGE, response.getMessage()); + assertTrue(client.isConnected()); + } finally { + client.close(); // close connection + assertFalse(client.isConnected()); + } + } + } + + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) public void testIdleTimeout() throws Exception { RpcClientManager.RpcConnectionKey rpcConnectionKey = new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); - BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout - client.connect(); + //500 millis idle timeout + BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); assertTrue(client.isConnected()); - Thread.sleep(2000); + Thread.sleep(600); //timeout assertFalse(client.isConnected()); client.connect(); // try to reconnect assertTrue(client.isConnected()); - client.close(); + + Thread.sleep(600); //timeout assertFalse(client.isConnected()); + client.close(); } - @Test - public void testIdleTimeoutWithActiveRequest() throws Exception { + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testPingOnIdle() throws Exception { RpcClientManager.RpcConnectionKey rpcConnectionKey = new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); - BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout - client.connect(); + //500 millis request timeout + BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + assertTrue(client.isConnected()); + + Thread.sleep(600); + assertTrue(client.isConnected()); + Thread.sleep(600); assertTrue(client.isConnected()); + client.close(); + } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testIdleTimeoutWithActiveRequest() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + //500 millis idle timeout + BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); + assertTrue(client.isConnected()); + BlockingInterface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); - EchoMessage message = stub.deley(null, echoMessage); //3 sec delay + EchoMessage message = stub.delay(null, echoMessage); //3 sec delay assertEquals(message, echoMessage); + assertTrue(client.isConnected()); - Thread.sleep(2000); + assertTrue(client.getActiveRequests() == 0); + Thread.sleep(600); //timeout assertFalse(client.isConnected()); } + + @Test(timeout = 60000) + @SetupRpcConnection(setupRpcClient = false) + public void testRequestTimeoutOnBusy() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + + //500 millis request timeout + BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + assertTrue(client.isConnected()); + + BlockingInterface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + boolean expected = false; + try { + EchoMessage message = stub.busy(null, echoMessage); //30 sec delay + fail(); + } catch (TajoServiceException e) { + expected = true; + } finally { + client.close(); + } + assertTrue(expected); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java index 5f86518..71a2b6f 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java @@ -28,10 +28,9 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class TestRpcClientManager { @@ -39,59 +38,141 @@ public class TestRpcClientManager { public void testRaceCondition() throws Exception { final int parallelCount = 50; final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), parallelCount); server.start(); + try { + final InetSocketAddress address = server.getListenAddress(); + final RpcClientManager manager = RpcClientManager.getInstance(); - final InetSocketAddress address = server.getListenAddress(); - final RpcClientManager manager = RpcClientManager.getInstance(); - - ExecutorService executor = Executors.newFixedThreadPool(parallelCount); - List<Future> tasks = new ArrayList<Future>(); - for (int i = 0; i < parallelCount; i++) { - tasks.add(executor.submit(new Runnable() { - @Override - public void run() { - NettyClientBase client = null; - try { - client = manager.getClient(address, DummyProtocol.class, false); - } catch (Throwable e) { - fail(e.getMessage()); + List<Future> tasks = new ArrayList<Future>(); + for (int i = 0; i < parallelCount; i++) { + tasks.add(executor.submit(new Runnable() { + @Override + public void run() { + NettyClientBase client = null; + try { + client = manager.getClient(address, DummyProtocol.class, false); + } catch (Throwable e) { + fail(e.getMessage()); + } + assertTrue(client.isConnected()); } - assertTrue(client.isConnected()); - } - }) - ); + }) + ); + } + + for (Future future : tasks) { + future.get(); + } + + NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false); + RpcClientManager.cleanup(clientBase); + } finally { + server.shutdown(); + executor.shutdown(); + RpcClientManager.close(); } + } + + @Test + public void testClientCloseEvent() throws Exception { + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), 3); + server.start(); + RpcClientManager manager = RpcClientManager.getInstance(); + + try { + + NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); + assertTrue(client.isConnected()); + assertTrue(client.getChannel().isWritable()); + + RpcClientManager.RpcConnectionKey key = client.getKey(); + assertTrue(RpcClientManager.contains(key)); - for (Future future : tasks) { - future.get(); + client.close(); + assertFalse(RpcClientManager.contains(key)); + } finally { + server.shutdown(); + RpcClientManager.close(); } + } + + @Test + public void testClientCloseEventWithReconnect() throws Exception { + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), 3); + server.start(); + int repeat = 10; + RpcClientManager manager = RpcClientManager.getInstance(); + + try { - NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false); - RpcClientManager.cleanup(clientBase); - server.shutdown(); - executor.shutdown(); + NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); + assertTrue(client.isConnected()); + + RpcClientManager.RpcConnectionKey key = client.getKey(); + assertTrue(RpcClientManager.contains(key)); + + client.close(); + assertFalse(client.isConnected()); + assertFalse(RpcClientManager.contains(key)); + + for (int i = 0; i < repeat; i++) { + client.connect(); + assertTrue(client.isConnected()); + assertTrue(RpcClientManager.contains(key)); + + client.close(); + assertFalse(client.isConnected()); + assertFalse(RpcClientManager.contains(key)); + } + } finally { + server.shutdown(); + RpcClientManager.close(); + } } @Test - public void testCloseFuture() throws Exception { + public void testUnManagedClient() throws Exception { final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 3); server.start(); + RpcClientManager.RpcConnectionKey key = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + RpcClientManager.close(); + RpcClientManager manager = RpcClientManager.getInstance(); + + try { + NettyClientBase client1 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false); + assertTrue(client1.isConnected()); + assertFalse(RpcClientManager.contains(key)); - final RpcClientManager manager = RpcClientManager.getInstance(); + NettyClientBase client2 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false); + assertTrue(client2.isConnected()); + assertFalse(RpcClientManager.contains(key)); - NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); - assertTrue(client.isConnected()); - assertTrue(client.getChannel().isWritable()); + assertEquals(client1.getRemoteAddress(), client2.getRemoteAddress()); + assertNotEquals(client1.getChannel(), client2.getChannel()); - RpcClientManager.RpcConnectionKey key = client.getKey(); - assertTrue(RpcClientManager.contains(key)); + client1.close(); + assertFalse(client1.isConnected()); + assertTrue(client2.isConnected()); - client.close(); - assertFalse(RpcClientManager.contains(key)); - server.shutdown(); + client1.connect(); + client2.close(); + assertFalse(client2.isConnected()); + assertTrue(client1.isConnected()); + + RpcClientManager.cleanup(client1, client2); + } finally { + server.shutdown(); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java index 0ca7563..abcc057 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java @@ -68,7 +68,7 @@ public class DummyProtocolAsyncImpl implements Interface { } @Override - public void deley(RpcController controller, EchoMessage request, + public void delay(RpcController controller, EchoMessage request, RpcCallback<EchoMessage> done) { try { Thread.sleep(3000); @@ -79,8 +79,25 @@ public class DummyProtocolAsyncImpl implements Interface { done.run(request); } + @Override + public void busy(RpcController controller, EchoMessage request, + RpcCallback<EchoMessage> done) { + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + LOG.error(e.getMessage()); + } + done.run(request); + } + + @Override public void throwException(RpcController controller, EchoMessage request, RpcCallback<EchoMessage> done) { - done.run(request); + if(controller != null) { + controller.setFailed("throwException"); + done.run(request); + } else { + throw new RuntimeException("throwException"); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2e7d03df/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java index 8d4b597..40eb18f 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java @@ -65,7 +65,7 @@ public class DummyProtocolBlockingImpl implements BlockingInterface { } @Override - public EchoMessage deley(RpcController controller, EchoMessage request) + public EchoMessage delay(RpcController controller, EchoMessage request) throws ServiceException { try { Thread.sleep(3000); @@ -76,6 +76,17 @@ public class DummyProtocolBlockingImpl implements BlockingInterface { return request; } + @Override + public EchoMessage busy(RpcController controller, EchoMessage request) { + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + LOG.error(e.getMessage()); + } + return request; + } + + @Override public EchoMessage throwException(RpcController controller, EchoMessage request) throws ServiceException { throw new ServiceException("Exception Test");
