http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java deleted file mode 100644 index ed6b634..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; - -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - -public final class RpcChannelFactory { - private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class); - - private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2; - - private static final Object lockObjectForLoopGroup = new Object(); - private static AtomicInteger serverCount = new AtomicInteger(0); - - public enum ClientChannelId { - CLIENT_DEFAULT, - FETCHER - } - - private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount = - new ConcurrentHashMap<ClientChannelId, Integer>(); - private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool = - new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>(); - - private RpcChannelFactory(){ - } - - static { - Runtime.getRuntime().addShutdownHook(new CleanUpHandler()); - - defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1); - defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1); - } - - /** - * make this factory static thus all clients can share its thread pool. - * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe - */ - public static EventLoopGroup getSharedClientEventloopGroup() { - return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM); - } - - /** - * make this factory static thus all clients can share its thread pool. - * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe - * - * @param workerNum The number of workers - */ - public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){ - //shared woker and boss pool - return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum); - } - - /** - * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput. - * - * @param clientId - * @param workerNum - * @return - */ - public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) { - Queue<EventLoopGroup> eventLoopGroupQueue; - EventLoopGroup returnEventLoopGroup; - - synchronized (lockObjectForLoopGroup) { - eventLoopGroupQueue = eventLoopGroupPool.get(clientId); - if (eventLoopGroupQueue == null) { - eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum); - } - - returnEventLoopGroup = eventLoopGroupQueue.poll(); - if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) { - returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum); - } - eventLoopGroupQueue.add(returnEventLoopGroup); - } - - return returnEventLoopGroup; - } - - protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) { - return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown()); - } - - // Client must release the external resources - protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) { - int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId); - Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>(); - eventLoopGroupPool.put(clientId, loopGroupQueue); - - for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) { - loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum)); - } - - return loopGroupQueue; - } - - protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) { - if (LOG.isDebugEnabled()) { - LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum); - } - - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build(); - - return new NioEventLoopGroup(workerNum, clientFactory); - } - - // Client must release the external resources - public static ServerBootstrap createServerChannelFactory(String name, int workerNum) { - name = name + "-" + serverCount.incrementAndGet(); - if(LOG.isInfoEnabled()){ - LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum); - } - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build(); - ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build(); - - EventLoopGroup bossGroup = - new NioEventLoopGroup(1, bossFactory); - EventLoopGroup workerGroup = - new NioEventLoopGroup(workerNum, workerFactory); - - return new ServerBootstrap().group(bossGroup, workerGroup); - } - - public static void shutdownGracefully(){ - if(LOG.isDebugEnabled()) { - LOG.debug("Shutdown Shared RPC Pool"); - } - - synchronized(lockObjectForLoopGroup) { - for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) { - for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) { - eventLoopGroup.shutdownGracefully(); - } - - eventLoopGroupQueue.clear(); - } - eventLoopGroupPool.clear(); - } - } - - static class CleanUpHandler extends Thread { - - @Override - public void run() { - RpcChannelFactory.shutdownGracefully(); - } - - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java deleted file mode 100644 index 6d1f479..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import io.netty.channel.ConnectTimeoutException; -import io.netty.util.internal.logging.CommonsLoggerFactory; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; - -public class RpcConnectionPool { - private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class); - - private Map<RpcConnectionKey, NettyClientBase> connections = - new HashMap<RpcConnectionKey, NettyClientBase>(); - - private static RpcConnectionPool instance; - private final Object lockObject = new Object(); - - public final static int RPC_RETRIES = 3; - - private RpcConnectionPool() { - } - - public synchronized static RpcConnectionPool getPool() { - if(instance == null) { - InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); - instance = new RpcConnectionPool(); - } - return instance; - } - - private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - NettyClientBase client; - if(rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES); - } else { - client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES); - } - return client; - } - - public static final long DEFAULT_TIMEOUT = 3000; - public static final long DEFAULT_INTERVAL = 500; - - public NettyClientBase getConnection(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL); - } - - public NettyClientBase getConnection(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode, long timeout, long interval) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); - - RpcUtils.Timer timer = new RpcUtils.Timer(timeout); - for (; !timer.isTimedOut(); timer.elapsed()) { - NettyClientBase client; - synchronized (lockObject) { - client = connections.get(key); - if (client == null) { - connections.put(key, client = makeConnection(key)); - } - } - if (client.acquire(timer.remaining())) { - return client; - } - timer.interval(interval); - } - - throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec"); - } - - public void releaseConnection(NettyClientBase client) { - release(client, false); - } - - public void closeConnection(NettyClientBase client) { - release(client, true); - } - - private void release(NettyClientBase client, boolean close) { - if (client == null) { - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Close connection [" + client.getKey() + "]"); - } - try { - if (returnToPool(client, close)) { - client.close(); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Current Connections [" + connections.size() + "]"); - } - } catch (Exception e) { - LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); - } - } - - // return true if the connection should be closed - private boolean returnToPool(NettyClientBase client, boolean close) { - synchronized (lockObject) { - if (client.release() && (close || !client.isConnected())) { - connections.remove(client.getKey()); - return true; - } - } - return false; - } - - public void close() { - if(LOG.isDebugEnabled()) { - LOG.debug("Pool Closed"); - } - - synchronized (lockObject) { - for (NettyClientBase eachClient : connections.values()) { - try { - eachClient.close(); - } catch (Exception e) { - LOG.error("close client pool error", e); - } - } - connections.clear(); - } - } - - public void shutdown(){ - close(); - RpcChannelFactory.shutdownGracefully(); - } - - static class RpcConnectionKey { - final InetSocketAddress addr; - final Class<?> protocolClass; - final boolean asyncMode; - - final String description; - - public RpcConnectionKey(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) { - this.addr = addr; - this.protocolClass = protocolClass; - this.asyncMode = asyncMode; - this.description = "["+ protocolClass + "] " + addr + "," + asyncMode; - } - - @Override - public String toString() { - return description; - } - - @Override - public boolean equals(Object obj) { - if(!(obj instanceof RpcConnectionKey)) { - return false; - } - - return toString().equals(obj.toString()); - } - - @Override - public int hashCode() { - return description.hashCode(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java deleted file mode 100644 index 152d426..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.concurrent.atomic.AtomicReference; - -public class RpcUtils { - - public static String normalizeInetSocketAddress(InetSocketAddress addr) { - return addr.getAddress().getHostAddress() + ":" + addr.getPort(); - } - - /** - * Util method to build socket addr from either: - * <host> - * <host>:<port> - * <fs>://<host>:<port>/<path> - */ - public static InetSocketAddress createSocketAddr(String host, int port) { - return new InetSocketAddress(host, port); - } - - /** - * Returns InetSocketAddress that a client can use to - * connect to the server. NettyServerBase.getListenerAddress() is not correct when - * the server binds to "0.0.0.0". This returns "hostname:port" of the server, - * or "127.0.0.1:port" when the getListenerAddress() returns "0.0.0.0:port". - * - * @param addr of a listener - * @return socket address that a client can use to connect to the server. - */ - public static InetSocketAddress getConnectAddress(InetSocketAddress addr) { - if (!addr.isUnresolved() && addr.getAddress().isAnyLocalAddress()) { - try { - addr = new InetSocketAddress(InetAddress.getLocalHost(), addr.getPort()); - } catch (UnknownHostException uhe) { - // shouldn't get here unless the host doesn't have a loopback iface - addr = new InetSocketAddress("127.0.0.1", addr.getPort()); - } - } - InetSocketAddress canonicalAddress = - new InetSocketAddress(addr.getAddress().getCanonicalHostName(), addr.getPort()); - return canonicalAddress; - } - - public static InetSocketAddress createUnresolved(String addr) { - String [] splitted = addr.split(":"); - return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1])); - } - - public static class Timer { - private long remaining; - private long prev; - public Timer(long timeout) { - this.remaining = timeout; - this.prev = System.currentTimeMillis(); - } - - public boolean isTimedOut() { - return remaining <= 0; - } - - public void elapsed() { - long current = System.currentTimeMillis(); - remaining -= (prev - current); - prev = current; - } - - public void interval(long wait) { - if (wait <= 0 || isTimedOut()) { - return; - } - try { - Thread.sleep(Math.min(remaining, wait)); - } catch (Exception ex) { - // ignore - } - } - - public long remaining() { - return remaining; - } - } - - public static class Scrutineer<T> { - - private final AtomicReference<T> reference = new AtomicReference<T>(); - - T check(T ticket) { - T granted = reference.get(); - for (;granted == null; granted = reference.get()) { - if (reference.compareAndSet(null, ticket)) { - return ticket; - } - } - return granted; - } - - boolean clear(T granted) { - return reference.compareAndSet(granted, null); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java deleted file mode 100644 index fb1cec2..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import com.google.protobuf.ServiceException; - -public abstract class ServerCallable<T> { - protected InetSocketAddress addr; - protected long startTime; - protected long endTime; - protected Class<?> protocol; - protected boolean asyncMode; - protected boolean closeConn; - protected RpcConnectionPool connPool; - - public abstract T call(NettyClientBase client) throws Exception; - - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) { - this(connPool, addr, protocol, asyncMode, false); - } - - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, - boolean asyncMode, boolean closeConn) { - this.connPool = connPool; - this.addr = addr; - this.protocol = protocol; - this.asyncMode = asyncMode; - this.closeConn = closeConn; - } - - public void beforeCall() { - this.startTime = System.currentTimeMillis(); - } - - public long getStartTime(){ - return startTime; - } - - public void afterCall() { - this.endTime = System.currentTimeMillis(); - } - - public long getEndTime(){ - return endTime; - } - - boolean abort = false; - public void abort() { - abort = true; - } - /** - * Run this instance with retries, timed waits, - * and refinds of missing regions. - * - * @param <T> the type of the return value - * @return an object of type T - * @throws com.google.protobuf.ServiceException if a remote or network exception occurs - */ - public T withRetries() throws ServiceException { - //TODO configurable - final long pause = 500; //ms - final int numRetries = 3; - List<Throwable> exceptions = new ArrayList<Throwable>(); - - for (int tries = 0; tries < numRetries; tries++) { - NettyClientBase client = null; - try { - beforeCall(); - if(addr != null) { - client = connPool.getConnection(addr, protocol, asyncMode); - } - return call(client); - } catch (IOException ioe) { - exceptions.add(ioe); - if(abort) { - throw new ServiceException(ioe.getMessage(), ioe); - } - if (tries == numRetries - 1) { - throw new ServiceException("Giving up after tries=" + tries, ioe); - } - } catch (Throwable t) { - throw new ServiceException(t); - } finally { - afterCall(); - if(closeConn) { - connPool.closeConnection(client); - } else { - connPool.releaseConnection(client); - } - } - try { - Thread.sleep(pause * (tries + 1)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ServiceException("Giving up after tries=" + tries, e); - } - } - return null; - } - - /** - * Run this instance against the server once. - * @param <T> the type of the return value - * @return an object of type T - * @throws java.io.IOException if a remote or network exception occurs - * @throws RuntimeException other unspecified error - */ - public T withoutRetries() throws IOException, RuntimeException { - NettyClientBase client = null; - try { - beforeCall(); - client = connPool.getConnection(addr, protocol, asyncMode); - return call(client); - } catch (Throwable t) { - Throwable t2 = translateException(t); - if (t2 instanceof IOException) { - throw (IOException)t2; - } else { - throw new RuntimeException(t2); - } - } finally { - afterCall(); - if(closeConn) { - connPool.closeConnection(client); - } else { - connPool.releaseConnection(client); - } - } - } - - private static Throwable translateException(Throwable t) throws IOException { - if (t instanceof UndeclaredThrowableException) { - t = t.getCause(); - } - if (t instanceof RemoteException && t.getCause() != null) { - t = t.getCause(); - } - return t; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java deleted file mode 100644 index 113d181..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/TajoServiceException.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.ServiceException; -import org.apache.commons.lang.exception.ExceptionUtils; - -public class TajoServiceException extends ServiceException { - private String traceMessage; - private String protocol; - private String remoteAddress; - - public TajoServiceException(String message) { - super(message); - } - public TajoServiceException(String message, String traceMessage) { - super(message); - this.traceMessage = traceMessage; - } - - public TajoServiceException(String message, Throwable cause, String protocol, String remoteAddress) { - super(message, cause); - - this.protocol = protocol; - this.remoteAddress = remoteAddress; - } - - public String getTraceMessage() { - if(traceMessage == null && getCause() != null){ - this.traceMessage = ExceptionUtils.getStackTrace(getCause()); - } - return traceMessage; - } - - public String getProtocol() { - return protocol; - } - - public String getRemoteAddress() { - return remoteAddress; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/DummyProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/proto/DummyProtos.proto b/tajo-rpc/src/main/proto/DummyProtos.proto deleted file mode 100644 index f53f0d6..0000000 --- a/tajo-rpc/src/main/proto/DummyProtos.proto +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2012 Database Lab., Korea Univ. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.tajo.rpc.test"; -option java_outer_classname = "DummyProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -message MulRequest1 { - required int32 x1 = 1; - required int32 x2 = 2; -} - -message MulRequest2 { - required int32 x1 = 1; - required int32 x2 = 2; -} - -message MulResponse { - required int32 result1 = 1; - required int32 result2 = 2; -} - -message InnerNode { - required string instr = 1; -} - -message InnerRequest { - repeated InnerNode nodes = 1; -} - -message InnerResponse { - repeated InnerNode nodes = 1; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/RpcProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/proto/RpcProtos.proto b/tajo-rpc/src/main/proto/RpcProtos.proto deleted file mode 100644 index 69f43ed..0000000 --- a/tajo-rpc/src/main/proto/RpcProtos.proto +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2012 Database Lab., Korea Univ. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.tajo.rpc"; -option java_outer_classname = "RpcProtos"; - -message RpcRequest { - required int32 id = 1; - required string method_name = 2; - optional bytes request_message = 3; -} - -message RpcResponse { - required int32 id = 1; - optional bytes response_message = 2; - optional string error_class = 3; - optional string error_message = 4; - optional string error_trace = 5; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/TestProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/proto/TestProtocol.proto b/tajo-rpc/src/main/proto/TestProtocol.proto deleted file mode 100644 index 58640ea..0000000 --- a/tajo-rpc/src/main/proto/TestProtocol.proto +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2012 Database Lab., Korea Univ. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.tajo.rpc.test"; -option java_outer_classname = "DummyProtocol"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -import "TestProtos.proto"; - -service DummyProtocolService { - rpc sum (SumRequest) returns (SumResponse); - rpc echo (EchoMessage) returns (EchoMessage); - rpc getError (EchoMessage) returns (EchoMessage); - rpc getNull (EchoMessage) returns (EchoMessage); - rpc deley (EchoMessage) returns (EchoMessage); - rpc throwException (EchoMessage) returns (EchoMessage); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/main/proto/TestProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/proto/TestProtos.proto b/tajo-rpc/src/main/proto/TestProtos.proto deleted file mode 100644 index 5001c0e..0000000 --- a/tajo-rpc/src/main/proto/TestProtos.proto +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2012 Database Lab., Korea Univ. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -option java_package = "org.apache.tajo.rpc.test"; -option java_outer_classname = "TestProtos"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -message EchoMessage { - required string message = 1; -} - -message SumRequest { - required int32 x1 = 1; - required int64 x2 = 2; - required double x3 = 3; - required float x4 = 4; -} - -message SumResponse { - required double result = 1; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/log4j.properties ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/log4j.properties b/tajo-rpc/src/test/java/log4j.properties deleted file mode 100644 index 2c4d991..0000000 --- a/tajo-rpc/src/test/java/log4j.properties +++ /dev/null @@ -1,25 +0,0 @@ -## -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# log4j configuration used during build and unit tests - -log4j.rootLogger=info,stdout -log4j.threshhold=ALL -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p: %c (%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java deleted file mode 100644 index a974a65..0000000 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.RpcCallback; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.test.DummyProtocol; -import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface; -import org.apache.tajo.rpc.test.TestProtos.EchoMessage; -import org.apache.tajo.rpc.test.TestProtos.SumRequest; -import org.apache.tajo.rpc.test.TestProtos.SumResponse; -import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; -import org.junit.AfterClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExternalResource; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.*; - -public class TestAsyncRpc { - private static Log LOG = LogFactory.getLog(TestAsyncRpc.class); - private static String MESSAGE = "TestAsyncRpc"; - - double sum; - String echo; - - AsyncRpcServer server; - AsyncRpcClient client; - Interface stub; - DummyProtocolAsyncImpl service; - int retries; - - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - @interface SetupRpcConnection { - boolean setupRpcServer() default true; - boolean setupRpcClient() default true; - } - - @Rule - public ExternalResource resource = new ExternalResource() { - - private Description description; - - @Override - public Statement apply(Statement base, Description description) { - this.description = description; - return super.apply(base, description); - } - - @Override - protected void before() throws Throwable { - SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - - if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { - setUpRpcServer(); - } - - if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { - setUpRpcClient(); - } - } - - @Override - protected void after() { - SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - - if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { - try { - tearDownRpcClient(); - } catch (Exception e) { - fail(e.getMessage()); - } - } - - if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { - try { - tearDownRpcServer(); - } catch (Exception e) { - fail(e.getMessage()); - } - } - } - - }; - - public void setUpRpcServer() throws Exception { - service = new DummyProtocolAsyncImpl(); - server = new AsyncRpcServer(DummyProtocol.class, - service, new InetSocketAddress("127.0.0.1", 0), 2); - server.start(); - } - - public void setUpRpcClient() throws Exception { - retries = 1; - - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( - RpcUtils.getConnectAddress(server.getListenAddress()), - DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); - client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT); - stub = client.getStub(); - } - - @AfterClass - public static void tearDownClass() throws Exception { - RpcChannelFactory.shutdownGracefully(); - } - - public void tearDownRpcServer() throws Exception { - if(server != null) { - server.shutdown(); - server = null; - } - } - - public void tearDownRpcClient() throws Exception { - if(client != null) { - client.close(); - client = null; - } - } - - boolean calledMarker = false; - - @Test - public void testRpc() throws Exception { - - SumRequest sumRequest = SumRequest.newBuilder() - .setX1(1) - .setX2(2) - .setX3(3.15d) - .setX4(2.0f).build(); - - stub.sum(null, sumRequest, new RpcCallback<SumResponse>() { - @Override - public void run(SumResponse parameter) { - sum = parameter.getResult(); - assertTrue(8.15d == sum); - } - }); - - - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - RpcCallback<EchoMessage> callback = new RpcCallback<EchoMessage>() { - @Override - public void run(EchoMessage parameter) { - echo = parameter.getMessage(); - assertEquals(MESSAGE, echo); - calledMarker = true; - } - }; - stub.echo(null, echoMessage, callback); - Thread.sleep(1000); - assertTrue(calledMarker); - } - - private CountDownLatch testNullLatch; - - @Test - public void testGetNull() throws Exception { - testNullLatch = new CountDownLatch(1); - stub.getNull(null, null, new RpcCallback<EchoMessage>() { - @Override - public void run(EchoMessage parameter) { - assertNull(parameter); - LOG.info("testGetNull retrieved"); - testNullLatch.countDown(); - } - }); - assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS)); - assertTrue(service.getNullCalled); - } - - @Test - public void testCallFuture() throws Exception { - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); - - assertFalse(future.isDone()); - assertEquals(future.get(), echoMessage); - assertTrue(future.isDone()); - } - - @Test - public void testCallFutureTimeout() throws Exception { - boolean timeout = false; - try { - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); - - assertFalse(future.isDone()); - future.get(1, TimeUnit.SECONDS); - } catch (TimeoutException te) { - timeout = true; - } - assertTrue(timeout); - } - - @Test - public void testCallFutureDisconnected() throws Exception { - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - - tearDownRpcServer(); - - stub.echo(future.getController(), echoMessage, future); - EchoMessage response = future.get(); - - assertNull(response); - assertTrue(future.getController().failed()); - assertTrue(future.getController().errorText() != null); - } - - @Test - public void testStubDisconnected() throws Exception { - - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - - if (server != null) { - server.shutdown(true); - server = null; - } - - stub = client.getStub(); - stub.echo(future.getController(), echoMessage, future); - EchoMessage response = future.get(); - - assertNull(response); - assertTrue(future.getController().failed()); - assertTrue(future.getController().errorText() != null); - } - - @Test - @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) - public void testConnectionRetry() throws Exception { - retries = 10; - ServerSocket serverSocket = new ServerSocket(0); - final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); - serverSocket.close(); - service = new DummyProtocolAsyncImpl(); - - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - - //lazy startup - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(1000); - server = new AsyncRpcServer(DummyProtocol.class, - service, address, 2); - } catch (Exception e) { - fail(e.getMessage()); - } - server.start(); - } - }); - serverThread.start(); - - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - stub = client.getStub(); - stub.echo(future.getController(), echoMessage, future); - - assertFalse(future.isDone()); - assertEquals(echoMessage, future.get()); - assertTrue(future.isDone()); - } - - @Test - public void testConnectionFailure() throws Exception { - InetSocketAddress address = new InetSocketAddress("test", 0); - try { - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); - NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries); - assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - } catch (Throwable throwable) { - fail(); - } - } - - @Test - @SetupRpcConnection(setupRpcClient=false) - public void testUnresolvedAddress() throws Exception { - String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( - RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - Interface stub = client.getStub(); - EchoMessage echoMessage = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - stub.deley(null, echoMessage, future); - - assertFalse(future.isDone()); - assertEquals(future.get(), echoMessage); - assertTrue(future.isDone()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java deleted file mode 100644 index 10dd766..0000000 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ /dev/null @@ -1,349 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import org.apache.tajo.rpc.test.DummyProtocol; -import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; -import org.apache.tajo.rpc.test.TestProtos.EchoMessage; -import org.apache.tajo.rpc.test.TestProtos.SumRequest; -import org.apache.tajo.rpc.test.TestProtos.SumResponse; -import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl; -import org.junit.AfterClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExternalResource; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -public class TestBlockingRpc { - public static final String MESSAGE = "TestBlockingRpc"; - - private BlockingRpcServer server; - private BlockingRpcClient client; - private BlockingInterface stub; - private DummyProtocolBlockingImpl service; - private int retries; - - @Retention(RetentionPolicy.RUNTIME) - @Target(ElementType.METHOD) - @interface SetupRpcConnection { - boolean setupRpcServer() default true; - boolean setupRpcClient() default true; - } - - @Rule - public ExternalResource resource = new ExternalResource() { - - private Description description; - - @Override - public Statement apply(Statement base, Description description) { - this.description = description; - return super.apply(base, description); - } - - @Override - protected void before() throws Throwable { - SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - - if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { - setUpRpcServer(); - } - - if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { - setUpRpcClient(); - } - } - - @Override - protected void after() { - SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - - if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { - try { - tearDownRpcClient(); - } catch (Exception e) { - fail(e.getMessage()); - } - } - - if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { - try { - tearDownRpcServer(); - } catch (Exception e) { - fail(e.getMessage()); - } - } - } - - }; - - public void setUpRpcServer() throws Exception { - service = new DummyProtocolBlockingImpl(); - server = new BlockingRpcServer(DummyProtocol.class, service, - new InetSocketAddress("127.0.0.1", 0), 2); - server.start(); - } - - public void setUpRpcClient() throws Exception { - retries = 1; - - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( - RpcUtils.getConnectAddress(server.getListenAddress()), - DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - stub = client.getStub(); - } - - @AfterClass - public static void tearDownClass() throws Exception { - RpcChannelFactory.shutdownGracefully(); - } - - public void tearDownRpcServer() throws Exception { - if(server != null) { - server.shutdown(); - server = null; - } - } - - public void tearDownRpcClient() throws Exception { - if(client != null) { - client.close(); - client = null; - } - } - - @Test - public void testRpc() throws Exception { - SumRequest request = SumRequest.newBuilder() - .setX1(1) - .setX2(2) - .setX3(3.15d) - .setX4(2.0f).build(); - SumResponse response1 = stub.sum(null, request); - assertEquals(8.15d, response1.getResult(), 1e-15); - - EchoMessage message = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - EchoMessage response2 = stub.echo(null, message); - assertEquals(MESSAGE, response2.getMessage()); - } - - @Test - @SetupRpcConnection(setupRpcClient=false) - public void testRpcWithServiceCallable() throws Exception { - RpcConnectionPool pool = RpcConnectionPool.getPool(); - final SumRequest request = SumRequest.newBuilder() - .setX1(1) - .setX2(2) - .setX3(3.15d) - .setX4(2.0f).build(); - - SumResponse response = - new ServerCallable<SumResponse>(pool, - server.getListenAddress(), DummyProtocol.class, false) { - @Override - public SumResponse call(NettyClientBase client) throws Exception { - BlockingInterface stub2 = client.getStub(); - SumResponse response1 = stub2.sum(null, request); - return response1; - } - }.withRetries(); - - assertEquals(8.15d, response.getResult(), 1e-15); - - response = - new ServerCallable<SumResponse>(pool, - server.getListenAddress(), DummyProtocol.class, false) { - @Override - public SumResponse call(NettyClientBase client) throws Exception { - BlockingInterface stub2 = client.getStub(); - SumResponse response1 = stub2.sum(null, request); - return response1; - } - }.withoutRetries(); - - assertTrue(8.15d == response.getResult()); - pool.close(); - } - - @Test - public void testThrowException() throws Exception { - EchoMessage message = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - - try { - stub.throwException(null, message); - fail("RpcCall should throw exception"); - } catch (Throwable t) { - assertTrue(t instanceof TajoServiceException); - assertEquals("Exception Test", t.getMessage()); - TajoServiceException te = (TajoServiceException)t; - assertEquals("org.apache.tajo.rpc.test.DummyProtocol", te.getProtocol()); - assertEquals(server.getListenAddress().getAddress().getHostAddress() + ":" + server.getListenAddress().getPort(), - te.getRemoteAddress()); - } - } - - @Test - @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) - public void testConnectionRetry() throws Exception { - retries = 10; - ServerSocket serverSocket = new ServerSocket(0); - final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); - serverSocket.close(); - - EchoMessage message = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - - //lazy startup - Thread serverThread = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(1000); - server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2); - } catch (Exception e) { - fail(e.getMessage()); - } - server.start(); - } - }); - serverThread.start(); - - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - stub = client.getStub(); - - EchoMessage response = stub.echo(null, message); - assertEquals(MESSAGE, response.getMessage()); - } - - @Test - public void testConnectionFailed() throws Exception { - NettyClientBase client = null; - - try { - int port = server.getListenAddress().getPort() + 1; - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( - RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), - DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - client.close(); - } catch (Throwable ce){ - if (client != null) { - client.close(); - } - fail(); - } - } - - @Test - public void testGetNull() throws Exception { - assertNull(stub.getNull(null, null)); - assertTrue(service.getNullCalled); - } - - @Test - public void testShutdown() throws Exception { - final StringBuilder error = new StringBuilder(); - Thread callThread = new Thread() { - public void run() { - try { - EchoMessage message = EchoMessage.newBuilder() - .setMessage(MESSAGE) - .build(); - stub.deley(null, message); - } catch (Exception e) { - error.append(e.getMessage()); - } - synchronized(error) { - error.notifyAll(); - } - } - }; - - callThread.start(); - - final CountDownLatch latch = new CountDownLatch(1); - Thread shutdownThread = new Thread() { - public void run() { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - } - try { - server.shutdown(); - server = null; - latch.countDown(); - } catch (Throwable e) { - e.printStackTrace(); - } - } - }; - shutdownThread.start(); - - assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS)); - - assertTrue(latch.getCount() == 0); - - synchronized(error) { - error.wait(5 * 1000); - } - - if(!error.toString().isEmpty()) { - fail(error.toString()); - } - } - - @Test - @SetupRpcConnection(setupRpcClient=false) - public void testUnresolvedAddress() throws Exception { - String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - RpcConnectionPool.RpcConnectionKey rpcConnectionKey = - new RpcConnectionPool.RpcConnectionKey( - RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); - client = new BlockingRpcClient(rpcConnectionKey, retries); - assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); - BlockingInterface stub = client.getStub(); - - EchoMessage message = EchoMessage.newBuilder() - .setMessage(MESSAGE).build(); - EchoMessage response2 = stub.echo(null, message); - assertEquals(MESSAGE, response2.getMessage()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java deleted file mode 100644 index 0ca7563..0000000 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc.test.impl; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.Interface; -import org.apache.tajo.rpc.test.TestProtos.EchoMessage; -import org.apache.tajo.rpc.test.TestProtos.SumRequest; -import org.apache.tajo.rpc.test.TestProtos.SumResponse; - -public class DummyProtocolAsyncImpl implements Interface { - private static final Log LOG = - LogFactory.getLog(DummyProtocolAsyncImpl.class); - public boolean getNullCalled = false; - public boolean getErrorCalled = false; - - @Override - public void sum(RpcController controller, SumRequest request, - RpcCallback<SumResponse> done) { - - SumResponse response = SumResponse.newBuilder().setResult( - request.getX1()+request.getX2()+request.getX3()+request.getX4() - ).build(); - done.run(response); - } - - @Override - public void echo(RpcController controller, EchoMessage request, - RpcCallback<EchoMessage> done) { - - done.run(request); - } - - @Override - public void getError(RpcController controller, EchoMessage request, - RpcCallback<EchoMessage> done) { - LOG.info("noCallback is called"); - getErrorCalled = true; - controller.setFailed(request.getMessage()); - done.run(request); - } - - @Override - public void getNull(RpcController controller, EchoMessage request, - RpcCallback<EchoMessage> done) { - getNullCalled = true; - LOG.info("noCallback is called"); - done.run(null); - } - - @Override - public void deley(RpcController controller, EchoMessage request, - RpcCallback<EchoMessage> done) { - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - LOG.error(e.getMessage()); - } - - done.run(request); - } - - public void throwException(RpcController controller, EchoMessage request, - RpcCallback<EchoMessage> done) { - done.run(request); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java deleted file mode 100644 index 8d4b597..0000000 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolBlockingImpl.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc.test.impl; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; -import org.apache.tajo.rpc.test.TestProtos.EchoMessage; -import org.apache.tajo.rpc.test.TestProtos.SumRequest; -import org.apache.tajo.rpc.test.TestProtos.SumResponse; - -public class DummyProtocolBlockingImpl implements BlockingInterface { - private static final Log LOG = - LogFactory.getLog(DummyProtocolBlockingImpl.class); - public boolean getNullCalled = false; - public boolean getErrorCalled = false; - - @Override - public SumResponse sum(RpcController controller, SumRequest request) - throws ServiceException { - return SumResponse.newBuilder().setResult( - request.getX1()+request.getX2()+request.getX3()+request.getX4() - ).build(); - } - - @Override - public EchoMessage echo(RpcController controller, EchoMessage request) - throws ServiceException { - return EchoMessage.newBuilder(). - setMessage(request.getMessage()).build(); - } - - @Override - public EchoMessage getError(RpcController controller, EchoMessage request) - throws ServiceException { - getErrorCalled = true; - controller.setFailed(request.getMessage()); - return request; - } - - @Override - public EchoMessage getNull(RpcController controller, EchoMessage request) - throws ServiceException { - getNullCalled = true; - LOG.info("noCallback is called"); - return null; - } - - @Override - public EchoMessage deley(RpcController controller, EchoMessage request) - throws ServiceException { - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - //throw new ServiceException(e.getMessage(), e); - } - - return request; - } - - public EchoMessage throwException(RpcController controller, EchoMessage request) - throws ServiceException { - throw new ServiceException("Exception Test"); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/pom.xml b/tajo-rpc/tajo-rpc-common/pom.xml new file mode 100644 index 0000000..2b1cd7a --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/pom.xml @@ -0,0 +1,216 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>tajo-project</artifactId> + <version>0.11.0-SNAPSHOT</version> + <groupId>org.apache.tajo</groupId> + <relativePath>../../tajo-project</relativePath> + </parent> + <packaging>jar</packaging> + <artifactId>tajo-rpc-common</artifactId> + <name>Tajo Rpc Common</name> + <description>Common Implementation for Netty</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + </configuration> + <executions> + <execution> + <id>create-jar</id> + <phase>prepare-package</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>dist</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>dist</id> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <target> + <echo file="${project.build.directory}/dist-layout-stitching.sh"> + run() { + echo "\$ ${@}" + "${@}" + res=$? + if [ $res != 0 ]; then + echo + echo "Failed!" + echo + exit $res + fi + } + + ROOT=`cd ${basedir}/..;pwd` + echo + echo "Current directory `pwd`" + echo + run rm -rf ${project.artifactId}-${project.version} + run mkdir ${project.artifactId}-${project.version} + run cd ${project.artifactId}-${project.version} + run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + echo + echo "Tajo Rpc dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo + </echo> + <exec executable="sh" dir="${project.build.directory}" failonerror="true"> + <arg line="./dist-layout-stitching.sh" /> + </exec> + </target> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> + +</project> http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java new file mode 100644 index 0000000..ad443d7 --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.util.Collections; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Base class for netty implementation. + */ +public class NettyServerBase { + private static final Log LOG = LogFactory.getLog(NettyServerBase.class); + private static final String DEFAULT_PREFIX = "RpcServer_"; + private static final AtomicInteger sequenceId = new AtomicInteger(0); + + protected String serviceName; + protected InetSocketAddress serverAddr; + protected InetSocketAddress bindAddress; + protected ChannelInitializer<Channel> initializer; + protected ServerBootstrap bootstrap; + protected ChannelFuture channelFuture; + protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private InetSocketAddress initIsa; + private Set<RpcEventListener> listeners = Collections.synchronizedSet(new HashSet<RpcEventListener>()); + + public NettyServerBase(InetSocketAddress address) { + this.initIsa = address; + } + + public NettyServerBase(String serviceName, InetSocketAddress addr) { + this.serviceName = serviceName; + this.initIsa = addr; + } + + public void setName(String name) { + this.serviceName = name; + } + + public void init(ChannelInitializer<Channel> initializer, int workerNum) { + for (RpcEventListener listener: listeners) { + listener.onBeforeInit(this); + } + + bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); + + this.initializer = initializer; + bootstrap + .channel(NioServerSocketChannel.class) + .childHandler(initializer) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10); + + for (RpcEventListener listener: listeners) { + listener.onAfterInit(this); + } + } + + public InetSocketAddress getListenAddress() { + return this.bindAddress; + } + + public void start() { + for (RpcEventListener listener: listeners) { + listener.onBeforeStart(this); + } + + if (serviceName == null) { + this.serviceName = getNextDefaultServiceName(); + } + + if (initIsa.getPort() == 0) { + try { + int port = getUnusedPort(); + serverAddr = new InetSocketAddress(initIsa.getHostName(), port); + } catch (IOException e) { + LOG.error(e, e); + } + } else { + serverAddr = initIsa; + } + + this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly(); + this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress(); + + for (RpcEventListener listener: listeners) { + listener.onAfterStart(this); + } + LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress); + } + + public Channel getChannel() { + return this.channelFuture.channel(); + } + + public void shutdown() { + shutdown(false); + } + + public void shutdown(boolean waitUntilThreadsStop) { + for (RpcEventListener listener: listeners) { + listener.onBeforeShutdown(this); + } + + try { + accepted.close(); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + } + + if(bootstrap != null) { + if (bootstrap.childGroup() != null) { + bootstrap.childGroup().shutdownGracefully(); + if (waitUntilThreadsStop) { + bootstrap.childGroup().terminationFuture().awaitUninterruptibly(); + } + } + + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + if (waitUntilThreadsStop) { + bootstrap.childGroup().terminationFuture().awaitUninterruptibly(); + } + } + } + + for (RpcEventListener listener: listeners) { + listener.onAfterShutdown(this); + } + + if (bindAddress != null) { + LOG.info("Rpc (" + serviceName + ") listened on " + + RpcUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown"); + } + } + + private static String getNextDefaultServiceName() { + return DEFAULT_PREFIX + sequenceId.getAndIncrement(); + } + + private static final int startPortRange = 10000; + private static final int endPortRange = 50000; + private static final Random rnd = new Random(System.currentTimeMillis()); + // each system has a different starting port number within the given range. + private static final AtomicInteger nextPortNum = + new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange)); + private static final Object lockObject = new Object(); + + + private synchronized static int getUnusedPort() throws IOException { + while (true) { + int port = nextPortNum.getAndIncrement(); + if (port >= endPortRange) { + synchronized (lockObject) { + nextPortNum.set(startPortRange); + port = nextPortNum.getAndIncrement(); + } + } + if (available(port)) { + return port; + } + } + } + + private static boolean available(int port) throws IOException { + if (port < 1024 || port > 65535) { + throw new IllegalArgumentException("Port Number Out of Bound: " + port); + } + + ServerSocket ss = null; + DatagramSocket ds = null; + + try { + ss = new ServerSocket(port); + ss.setReuseAddress(true); + + ds = new DatagramSocket(port); + ds.setReuseAddress(true); + + return true; + + } catch (IOException e) { + return false; + } finally { + if (ss != null) { + ss.close(); + } + + if (ds != null) { + ds.close(); + } + } + } + + public void addListener(RpcEventListener listener) { + listeners.add(listener); + } + + public void removeListener(RpcEventListener listener) { + listeners.remove(listener); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java new file mode 100644 index 0000000..30c110d --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RemoteException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +public class RemoteException extends RuntimeException { + public RemoteException() { + super(); + } + + public RemoteException(String message) { + super(message); + } + + public RemoteException(Throwable t) { + super(t); + } + + public RemoteException(String message, Throwable t) { + super(message, t); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9ae3cab/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java new file mode 100644 index 0000000..3c054ad --- /dev/null +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.io.IOException; +import java.util.Date; +import java.util.List; + +public class RetriesExhaustedException extends RuntimeException { + private static final long serialVersionUID = 1876775844L; + + public RetriesExhaustedException(final String msg) { + super(msg); + } + + public RetriesExhaustedException(final String msg, final IOException e) { + super(msg, e); + } + + /** + * Datastructure that allows adding more info around Throwable incident. + */ + public static class ThrowableWithExtraContext { + private final Throwable t; + private final long when; + private final String extras; + + public ThrowableWithExtraContext(final Throwable t, final long when, + final String extras) { + this.t = t; + this.when = when; + this.extras = extras; + } + + @Override + public String toString() { + return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + } + } + + /** + * Create a new RetriesExhaustedException from the list of prior failures. + * @param callableVitals Details from the {@link ServerCallable} we were using + * when we got this exception. + * @param numTries The number of tries we made + * @param exceptions List of exceptions that failed before giving up + */ + public RetriesExhaustedException(final String callableVitals, int numTries, + List<Throwable> exceptions) { + super(getMessage(callableVitals, numTries, exceptions)); + } + + /** + * Create a new RetriesExhaustedException from the list of prior failures. + * @param numTries + * @param exceptions List of exceptions that failed before giving up + */ + public RetriesExhaustedException(final int numTries, + final List<Throwable> exceptions) { + super(getMessage(numTries, exceptions)); + } + + private static String getMessage(String callableVitals, int numTries, + List<Throwable> exceptions) { + StringBuilder buffer = new StringBuilder("Failed contacting "); + buffer.append(callableVitals); + buffer.append(" after "); + buffer.append(numTries + 1); + buffer.append(" attempts.\nExceptions:\n"); + for (Throwable t : exceptions) { + buffer.append(t.toString()); + buffer.append("\n"); + } + return buffer.toString(); + } + + private static String getMessage(final int numTries, + final List<Throwable> exceptions) { + StringBuilder buffer = new StringBuilder("Failed after attempts="); + buffer.append(numTries + 1); + buffer.append(", exceptions:\n"); + for (Throwable t : exceptions) { + buffer.append(t.toString()); + buffer.append("\n"); + } + return buffer.toString(); + } +} \ No newline at end of file
