Repository: tajo Updated Branches: refs/heads/master 411a26d5d -> 4fd1739d4
TAJO-1935: Some Tasks don't work after they become TA_ASSIGNED. Closes #864 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4fd1739d Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4fd1739d Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4fd1739d Branch: refs/heads/master Commit: 4fd1739d491f547757eaa9d26e37a496eca02d92 Parents: 411a26d Author: Jinho Kim <[email protected]> Authored: Wed Nov 18 13:53:30 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed Nov 18 13:53:30 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../java/org/apache/tajo/master/QueryInProgress.java | 6 ++---- .../apache/tajo/querymaster/DefaultTaskScheduler.java | 14 +++++++------- .../java/org/apache/tajo/querymaster/QueryMaster.java | 11 ++++++----- .../org/apache/tajo/worker/ExecutionBlockContext.java | 12 +++++++----- .../org/apache/tajo/worker/NodeStatusUpdater.java | 6 +----- .../main/java/org/apache/tajo/worker/TaskManager.java | 5 +---- .../main/java/org/apache/tajo/rpc/RpcConstants.java | 1 - 8 files changed, 26 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 10fa37d..12b11be 100644 --- a/CHANGES +++ b/CHANGES @@ -35,6 +35,8 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-1935: Some Tasks don't work after they become TA_ASSIGNED. (jinho) + TAJO-1977: Cannot recognize the space-contained tablename and databasename. (Contributed by Dongkyu Hwangbo, committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index f76e7f0..b474d07 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -35,7 +35,6 @@ import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; @@ -44,7 +43,6 @@ import org.apache.tajo.util.RpcParameterFactory; import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.Properties; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -105,7 +103,7 @@ public class QueryInProgress { if (queryMasterRpcClient != null) { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<>(); queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); } } catch (Throwable e) { catchException("Failed to kill query " + queryId + " by exception " + e, e); @@ -222,7 +220,7 @@ public class QueryInProgress { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<>(); queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); querySubmitted = true; getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index f45f64d..f1c0f62 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -41,7 +41,10 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.plan.serder.LogicalNodeSerializer; import org.apache.tajo.resource.NodeResources; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; @@ -54,7 +57,6 @@ import org.apache.tajo.worker.FetchImpl; import java.net.InetSocketAddress; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -314,7 +316,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { .setQueue(context.getMasterContext().getQueryContext().get("queue", "default")); //TODO set queue masterClientService.reserveNodeResources(callBack.getController(), request.build(), callBack); - NodeResourceResponse response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + NodeResourceResponse response = callBack.get(); for (AllocationResourceProto resource : response.getResourceList()) { taskRequestEvents.add(new TaskRequestEvent(resource.getWorkerId(), resource, context.getBlockId())); @@ -896,8 +898,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); - BatchAllocationResponse responseProto = - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + BatchAllocationResponse responseProto = callFuture.get(); if (responseProto.getCancellationTaskCount() > 0) { for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { @@ -1015,8 +1016,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.allocateTasks(callFuture.getController(), requestProto.build(), callFuture); - BatchAllocationResponse - responseProto = callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + BatchAllocationResponse responseProto = callFuture.get(); if(responseProto.getCancellationTaskCount() > 0) { for (TaskAllocationProto proto : responseProto.getCancellationTaskList()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 6b26ddf..7104fb9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -42,7 +42,10 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.RpcParameterFactory; @@ -56,7 +59,6 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; public class QueryMaster extends CompositeService implements EventHandler { private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); @@ -180,8 +182,7 @@ public class QueryMaster extends CompositeService implements EventHandler { masterService.getAllWorkers(callBack.getController(), PrimitiveProtos.NullProto.getDefaultInstance(), callBack); - WorkerConnectionsResponse connectionsProto = - callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + WorkerConnectionsResponse connectionsProto = callBack.get(); return connectionsProto.getWorkerList(); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -300,7 +301,7 @@ public class QueryMaster extends CompositeService implements EventHandler { QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); - future.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + future.get(); } catch (Exception e) { //this function will be closed in new thread. //When tajo do stop cluster, tajo master maybe throw closed connection exception http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index db28433..9feae7e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -36,7 +36,10 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.pullserver.TajoPullServerService; -import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.AsyncRpcClient; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.Pair; @@ -47,7 +50,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -253,7 +255,7 @@ public class ExecutionBlockContext { //If QueryMaster does not responding, current execution block should be stop CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<>(); getStub().fatalError(callFuture.getController(), builder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); } catch (Exception e) { getWorkerContext().getTaskManager().getDispatcher().getEventHandler() .handle(new ExecutionBlockErrorEvent(taskAttemptId.getTaskId().getExecutionBlockId(), e)); @@ -300,7 +302,7 @@ public class ExecutionBlockContext { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<>(); stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); return; } @@ -355,7 +357,7 @@ public class ExecutionBlockContext { try { CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<>(); stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); - callFuture.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + callFuture.get(); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java index 3ca0bfb..162e5e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -32,7 +32,6 @@ import org.apache.tajo.resource.NodeResources; import org.apache.tajo.rpc.AsyncRpcClient; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.RpcParameterFactory; @@ -46,7 +45,6 @@ import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.tajo.ResourceProtos.*; @@ -168,11 +166,9 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N CallFuture<NodeHeartbeatResponse> callBack = new CallFuture<>(); resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack); - response = callBack.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + response = callBack.get(); } catch (InterruptedException e) { LOG.warn(e.getMessage()); - } catch (TimeoutException te) { - LOG.warn("Heartbeat response is being delayed.", te); } catch (ExecutionException ee) { LOG.warn("TajoMaster failure: " + ee.getMessage()); resourceTracker = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java index 18e9762..9e2e9e8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskManager.java @@ -33,7 +33,6 @@ import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.rpc.AsyncRpcClient; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.RpcClientManager; -import org.apache.tajo.rpc.RpcConstants; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; @@ -44,7 +43,6 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.TimeUnit; import static org.apache.tajo.ResourceProtos.*; @@ -125,8 +123,7 @@ public class TaskManager extends AbstractService implements EventHandler<TaskMan CallFuture<ExecutionBlockContextResponse> callback = new CallFuture<>(); stub.getExecutionBlockContext(callback.getController(), request.build(), callback); - ExecutionBlockContextResponse contextProto = - callback.get(RpcConstants.FUTURE_TIMEOUT_SECONDS_DEFAULT, TimeUnit.SECONDS); + ExecutionBlockContextResponse contextProto = callback.get(); ExecutionBlockContext context = new ExecutionBlockContext(getWorkerContext(), contextProto, client); context.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4fd1739d/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java index 95e5ae4..fec0e6b 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java @@ -27,7 +27,6 @@ public class RpcConstants { public static final String PING_PACKET = "TAJO"; public static final int DEFAULT_PAUSE = 1000; // 1 sec - public static final int FUTURE_TIMEOUT_SECONDS_DEFAULT = 10; /** How many times the connect will retry */ public static final String CLIENT_RETRY_NUM = "tajo.rpc.client.retry-num";
