TAJO-1405: Fix some illegal way of usages on connection pool. (Contributed by navis, Committed by Keuntae Park)
Closes #425 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/286b9567 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/286b9567 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/286b9567 Branch: refs/heads/index_support Commit: 286b956795a4dc2efb72c97896d86ed1049485e3 Parents: 0dc7d68 Author: Keuntae Park <[email protected]> Authored: Tue Mar 17 13:47:20 2015 +0900 Committer: Keuntae Park <[email protected]> Committed: Tue Mar 17 13:47:20 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/client/SessionConnection.java | 18 +++-- .../tajo/worker/ExecutionBlockContext.java | 43 +++++++---- .../main/java/org/apache/tajo/worker/Task.java | 75 +++++++++++--------- .../java/org/apache/tajo/worker/TaskRunner.java | 9 ++- .../org/apache/tajo/rpc/AsyncRpcClient.java | 9 ++- .../org/apache/tajo/rpc/NettyClientBase.java | 6 ++ 7 files changed, 103 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9d2cd14..0d7222f 100644 --- a/CHANGES +++ b/CHANGES @@ -29,6 +29,9 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1405: Fix some illegal way of usages on connection pool. + (Contributed by navis, Committed by Keuntae Park) + TAJO-1384: Duplicated output file path problem. (jihoon) TAJO-1386: CURRENT_DATE generates parsing errors sometimes. http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index d24e7b3..c084d95 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -57,7 +57,7 @@ public class SessionConnection implements Closeable { final RpcConnectionPool connPool; - private final String baseDatabase; + private String baseDatabase; private final UserRoleInfo userInfo; @@ -260,7 +260,8 @@ public class SessionConnection implements Closeable { } public Boolean selectDatabase(final String databaseName) throws ServiceException { - return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) { + Boolean selected = new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), + TajoMasterClientProtocol.class, false, true) { public Boolean call(NettyClientBase client) throws ServiceException { checkSessionAndGet(client); @@ -269,6 +270,11 @@ public class SessionConnection implements Closeable { return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue(); } }.withRetries(); + + if (selected == Boolean.TRUE) { + this.baseDatabase = databaseName; + } + return selected; } @Override @@ -278,13 +284,15 @@ public class SessionConnection implements Closeable { } // remove session + NettyClientBase client = null; try { - - NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); + client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false); TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub(); tajoMaster.removeSession(null, sessionId); - } catch (Throwable e) { + // ignore + } finally { + connPool.releaseConnection(client); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index a645689..2377720 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -36,13 +36,11 @@ import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; -import org.apache.tajo.worker.event.TaskRunnerStartEvent; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; @@ -139,7 +137,17 @@ public class ExecutionBlockContext { try{ this.resource.initialize(queryContext, plan); } catch (Throwable e) { - getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); + try { + NettyClientBase client = getQueryMasterConnection(); + try { + QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); + stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); + } finally { + connPool.releaseConnection(client); + } + } catch (Throwable t) { + //ignore + } throw e; } } @@ -148,15 +156,13 @@ public class ExecutionBlockContext { return resource; } - public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() + public NettyClientBase getQueryMasterConnection() throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - NettyClientBase clientBase = null; - try { - clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); - return clientBase.getStub(); - } finally { - connPool.releaseConnection(clientBase); - } + return connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); + } + + public void releaseConnection(NettyClientBase connection) { + connPool.releaseConnection(connection); } public void stop(){ @@ -267,7 +273,13 @@ public class ExecutionBlockContext { } private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception { - getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get()); + NettyClientBase client = getQueryMasterConnection(); + try { + QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); + stub.doneExecutionBlock(null, reporter, NullCallback.get()); + } finally { + connPool.releaseConnection(client); + } } protected void reportExecutionBlock(ExecutionBlockId ebId) { @@ -361,12 +373,14 @@ public class ExecutionBlockContext { return new Runnable() { int remainingRetries = MAX_RETRIES; - QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub; @Override public void run() { while (!reporterStop.get() && !Thread.interrupted()) { + + NettyClientBase client = null; try { - masterStub = getQueryMasterStub(); + client = getQueryMasterConnection(); + QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub(); if(tasks.size() == 0){ masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get()); @@ -391,6 +405,7 @@ public class ExecutionBlockContext { throw new RuntimeException(t); } } finally { + releaseConnection(client); if (remainingRetries > 0 && !reporterStop.get()) { synchronized (reporterThread) { try { http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 524b09b..9ff18dd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -52,6 +52,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; import org.apache.tajo.plan.logical.*; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; @@ -424,46 +425,52 @@ public class Task { executionBlockContext.completedTasksNum.incrementAndGet(); context.getHashShuffleAppenderManager().finalizeTask(taskId); - QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub(); - if (context.isStopped()) { - context.setExecutorProgress(0.0f); - if(context.getState() == TaskAttemptState.TA_KILLED) { - queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); - executionBlockContext.killedTasksNum.incrementAndGet(); - } else { - context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); + NettyClientBase client = executionBlockContext.getQueryMasterConnection(); + try { + QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub(); + if (context.isStopped()) { + context.setExecutorProgress(0.0f); + + if (context.getState() == TaskAttemptState.TA_KILLED) { + queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); + executionBlockContext.killedTasksNum.incrementAndGet(); + } else { + context.setState(TaskAttemptState.TA_FAILED); + TaskFatalErrorReport.Builder errorBuilder = + TaskFatalErrorReport.newBuilder() + .setId(getId().getProto()); + if (error != null) { + if (error.getMessage() == null) { + errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); + } else { + errorBuilder.setErrorMessage(error.getMessage()); + } + errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); + + queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); + executionBlockContext.failedTasksNum.incrementAndGet(); } + } else { + // if successful + context.setProgress(1.0f); + context.setState(TaskAttemptState.TA_SUCCEEDED); + executionBlockContext.succeededTasksNum.incrementAndGet(); - queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); - executionBlockContext.failedTasksNum.incrementAndGet(); + TaskCompletionReport report = getTaskCompletionReport(); + queryMasterStub.done(null, report, NullCallback.get()); } - } else { - // if successful - context.setProgress(1.0f); - context.setState(TaskAttemptState.TA_SUCCEEDED); - executionBlockContext.succeededTasksNum.incrementAndGet(); - - TaskCompletionReport report = getTaskCompletionReport(); - queryMasterStub.done(null, report, NullCallback.get()); + finishTime = System.currentTimeMillis(); + LOG.info(context.getTaskId() + " completed. " + + "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + + ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() + + ", killed: " + executionBlockContext.killedTasksNum.intValue() + + ", failed: " + executionBlockContext.failedTasksNum.intValue()); + cleanupTask(); + } finally { + executionBlockContext.releaseConnection(client); } - finishTime = System.currentTimeMillis(); - LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + - ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() - + ", killed: " + executionBlockContext.killedTasksNum.intValue() - + ", failed: " + executionBlockContext.failedTasksNum.intValue()); - cleanupTask(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 79725f6..058ea42 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -35,6 +35,7 @@ import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.container.TajoContainerIdPBImpl; import org.apache.tajo.master.container.TajoConverterUtils; import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import io.netty.channel.ConnectTimeoutException; @@ -196,9 +197,9 @@ public class TaskRunner extends AbstractService { TaskRequestProto taskRequest = null; while(!stopped) { - QueryMasterProtocolService.Interface qmClientService; + NettyClientBase client; try { - qmClientService = getContext().getQueryMasterStub(); + client = executionBlockContext.getQueryMasterConnection(); } catch (ConnectTimeoutException ce) { // NettyClientBase throws ConnectTimeoutException if connection was failed stop(); @@ -212,6 +213,8 @@ public class TaskRunner extends AbstractService { break; } + QueryMasterProtocolService.Interface qmClientService = client.getStub(); + try { if (callFuture == null) { callFuture = new CallFuture<TaskRequestProto>(); @@ -296,6 +299,8 @@ public class TaskRunner extends AbstractService { } } catch (Throwable t) { LOG.fatal(t.getMessage(), t); + } finally { + executionBlockContext.releaseConnection(client); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 1ea9fb1..3d856ce 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -35,11 +35,12 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class AsyncRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - private final Map<Integer, ResponseCallback> requests = + private final ConcurrentMap<Integer, ResponseCallback> requests = new ConcurrentHashMap<Integer, ResponseCallback>(); private final Method stubMethod; @@ -178,14 +179,12 @@ public class AsyncRpcClient extends NettyClientBase { @ChannelHandler.Sharable private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { - synchronized void registerCallback(int seqId, ResponseCallback callback) { + void registerCallback(int seqId, ResponseCallback callback) { - if (requests.containsKey(seqId)) { + if (requests.putIfAbsent(seqId, callback) != null) { throw new RemoteException( getErrorMessage("Duplicate Sequence Id "+ seqId)); } - - requests.put(seqId, callback); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/286b9567/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 7dfc5a2..72278f2 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -132,6 +132,12 @@ public abstract class NettyClientBase implements Closeable { final CountDownLatch ticket = new CountDownLatch(1); final CountDownLatch granted = connect.check(ticket); + // basically, it's double checked lock + if (ticket == granted && isConnected()) { + granted.countDown(); + return true; + } + if (ticket == granted) { connectUsingNetty(addr, new RetryConnectionListener(addr, granted)); }
