Repository: tajo Updated Branches: refs/heads/master 015913b7a -> 4595375f7
http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 1496b62..a30df54 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -44,6 +44,7 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; @@ -229,23 +230,9 @@ public class QueryMaster extends CompositeService implements EventHandler { // worker may fail to connect existing active master. Thus, // if worker can't connect the master, worker should try to connect another master and // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } - } else { - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } + ServiceTracker serviceTracker = workerContext.getServiceTracker(); + rpc = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterService = rpc.getStub(); CallFuture<WorkerResourcesRequest> callBack = new CallFuture<WorkerResourcesRequest>(); @@ -353,24 +340,8 @@ public class QueryMaster extends CompositeService implements EventHandler { NettyClientBase tmClient = null; try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } + tmClient = connPool.getConnection(workerContext.getServiceTracker().getUmbilicalAddress(), + QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); @@ -474,32 +445,13 @@ public class QueryMaster extends CompositeService implements EventHandler { for(QueryMasterTask eachTask: tempTasks) { NettyClientBase tmClient; try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } + ServiceTracker serviceTracker = queryMasterContext.getWorkerContext().getServiceTracker(); + tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), + QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse> callBack = - new CallFuture<QueryCoordinatorProtocol.TajoHeartbeatResponse>(); - + CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>(); TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask); masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 82fb37f..13f4dcc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -23,16 +23,16 @@ import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.ha.HAService; import org.apache.tajo.master.QueryInProgress; +import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.querymaster.QueryMasterTask; -import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Stage; -import org.apache.tajo.util.history.TaskHistory; +import org.apache.tajo.querymaster.Task; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.history.StageHistory; -import org.apache.tajo.worker.TaskRunnerHistory; +import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.worker.TaskRunner; +import org.apache.tajo.worker.TaskRunnerHistory; import java.text.DecimalFormat; import java.util.*; @@ -191,7 +191,7 @@ public class JSPUtil { } public static String getMasterActiveLabel(MasterContext context) { - HAService haService = context.getHAService(); + ServiceTracker haService = context.getHAService(); String activeLabel = ""; if (haService != null) { if (haService.isActiveStatus()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index dd408c9..c6a06f0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -51,6 +51,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.ApplicationIdUtils; import java.net.InetSocketAddress; @@ -274,31 +275,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { RpcConnectionPool connPool = RpcConnectionPool.getPool(); NettyClientBase tmClient = null; try { - - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } catch (Exception e) { - queryTaskContext.getQueryMasterContext().getWorkerContext(). - setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf)); - queryTaskContext.getQueryMasterContext().getWorkerContext(). - setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf)); - tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } - } else { - tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } - + ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker(); + tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index c217e3e..7f73916 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -34,8 +34,9 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ha.TajoMasterInfo; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.service.TajoMasterInfo; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.rm.TajoWorkerResourceManager; @@ -63,7 +64,6 @@ import java.io.*; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -112,6 +112,8 @@ public class TajoWorker extends CompositeService { @Deprecated private boolean taskRunnerMode; + private ServiceTracker serviceTracker; + private WorkerHeartbeatService workerHeartbeatThread; private AtomicBoolean stopped = new AtomicBoolean(false); @@ -189,6 +191,8 @@ public class TajoWorker extends CompositeService { this.systemConf = (TajoConf)conf; RackResolver.init(systemConf); + serviceTracker = ServiceTrackerFactory.get(systemConf); + this.workerContext = new WorkerContext(); this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -352,8 +356,8 @@ public class TajoWorker extends CompositeService { tajoMasterInfo = new TajoMasterInfo(); if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress()); + tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress()); } else { tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); @@ -425,18 +429,14 @@ public class TajoWorker extends CompositeService { return systemConf; } - public TajoWorkerManagerService getTajoWorkerManagerService() { - return tajoWorkerManagerService; + public ServiceTracker getServiceTracker() { + return serviceTracker; } public QueryMasterManagerService getQueryMasterManagerService() { return queryMasterManagerService; } - public TajoWorkerClientService getTajoWorkerClientService() { - return tajoWorkerClientService; - } - public TaskRunnerManager getTaskRunnerManager() { return taskRunnerManager; } @@ -521,10 +521,6 @@ public class TajoWorker extends CompositeService { TajoWorker.this.numClusterNodes.set(numClusterNodes); } - public int getNumClusterNodes() { - return TajoWorker.this.numClusterNodes.get(); - } - public void setClusterResource(ClusterResourceSummary clusterResource) { synchronized (numClusterNodes) { TajoWorker.this.clusterResource = clusterResource; @@ -537,26 +533,6 @@ public class TajoWorker extends CompositeService { } } - public InetSocketAddress getTajoMasterAddress() { - return tajoMasterInfo.getTajoMasterAddress(); - } - - public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { - tajoMasterInfo.setTajoMasterAddress(tajoMasterAddress); - } - - public InetSocketAddress getResourceTrackerAddress() { - return tajoMasterInfo.getWorkerResourceTrackerAddr(); - } - - public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { - tajoMasterInfo.setWorkerResourceTrackerAddr(workerResourceTrackerAddr); - } - - public int getPeerRpcPort() { - return getTajoWorkerManagerService() == null ? 0 : getTajoWorkerManagerService().getBindAddr().getPort(); - } - public boolean isQueryMasterMode() { return queryMasterMode; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 0b815d8..c0a6453 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -119,7 +119,7 @@ public class TajoWorkerClientService extends AbstractService { QueryId queryId = new QueryId(request.getQueryId()); QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); - QueryHistory queryHistory; + QueryHistory queryHistory = null; if (queryMasterTask == null) { queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString()); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 676c72b..870e9a0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -35,6 +35,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DiskDeviceInfo; import org.apache.tajo.storage.DiskMountInfo; import org.apache.tajo.storage.DiskUtil; @@ -183,22 +184,9 @@ public class WorkerHeartbeatService extends AbstractService { try { CallFuture<TajoHeartbeatResponse> callBack = new CallFuture<TajoHeartbeatResponse>(); - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); - } catch (Exception e) { - context.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); - context.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); - } - } else { - rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); - } - + ServiceTracker serviceTracker = context.getServiceTracker(); + rmClient = connectionPool.getConnection(serviceTracker.getResourceTrackerAddress(), + TajoResourceTrackerProtocol.class, true); TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 9d9f39c..4b76c73 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -26,6 +26,8 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rule.*; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; @@ -45,13 +47,8 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { InetSocketAddress masterAddress = null; try { - if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - masterAddress = HAServiceUtil.getMasterUmbilicalAddress(tajoConf); - } else { - masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); - } - masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true); - + ServiceTracker serviceTracker = ServiceTrackerFactory.get(tajoConf); + masterClient = pool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); masterClient.getStub(); } finally { if (masterClient != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/catalogview.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp index bc770d7..1ff81a6 100644 --- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp +++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp @@ -24,12 +24,13 @@ <%@ page import="org.apache.tajo.catalog.TableDesc" %> <%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.ha.HAService" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.util.FileUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.Collection" %> <%@ page import="java.util.List" %> <%@ page import="java.util.Map" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); CatalogService catalog = master.getCatalog(); @@ -59,7 +60,7 @@ //TODO filter with database Collection<String> tableNames = catalog.getAllTableNames(selectedDatabase); - HAService haService = master.getContext().getHAService(); + ServiceTracker haService = master.getContext().getHAService(); String activeLabel = ""; if (haService != null) { if (haService.isActiveStatus()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index 1fb5e40..aca1153 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -21,8 +21,8 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.master.cluster.WorkerConnectionInfo" %> -<%@ page import="org.apache.tajo.ha.HAService" %> -<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> +<%@ page import="org.apache.tajo.service.TajoMasterInfo" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerResource" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> @@ -30,6 +30,7 @@ <%@ page import="org.apache.tajo.util.TUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.*" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -75,7 +76,7 @@ String deadWorkersHtml = deadWorkers.isEmpty() ? "0": "<font color='red'>" + deadWorkers.size() + "</font>"; String deadQueryMastersHtml = deadQueryMasters.isEmpty() ? "0": "<font color='red'>" + deadQueryMasters.size() + "</font>"; - HAService haService = master.getContext().getHAService(); + ServiceTracker haService = master.getContext().getHAService(); List<TajoMasterInfo> masters = TUtil.newList(); String activeLabel = ""; http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index d98abfd..0a0558e 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -23,8 +23,8 @@ <%@ page import="org.apache.tajo.conf.TajoConf" %> <%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.ha.HAService" %> -<%@ page import="org.apache.tajo.ha.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> +<%@ page import="org.apache.tajo.service.TajoMasterInfo" %> <%@ page import="org.apache.tajo.master.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> @@ -35,6 +35,7 @@ <%@ page import="java.util.Collection" %> <%@ page import="java.util.Date" %> <%@ page import="java.util.Map" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -85,7 +86,7 @@ String numDeadWorkersHtml = numDeadWorkers == 0 ? "0" : "<font color='red'>" + numDeadWorkers + "</font>"; String numDeadQueryMastersHtml = numDeadQueryMasters == 0 ? "0" : "<font color='red'>" + numDeadQueryMasters + "</font>"; - HAService haService = master.getContext().getHAService(); + ServiceTracker haService = master.getContext().getHAService(); List<TajoMasterInfo> masters = TUtil.newList(); String activeLabel = ""; http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/resources/webapps/admin/query_executor.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp index 82836ac..a0f9a0a 100644 --- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp @@ -19,13 +19,14 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.ha.HAService" %> +<%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> +<%@ page import="javax.xml.ws.Service" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); - HAService haService = master.getContext().getHAService(); + ServiceTracker haService = master.getContext().getHAService(); String activeLabel = ""; if (haService != null) { if (haService.isActiveStatus()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index a6f9f74..f8642ed 100644 --- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -18,8 +18,6 @@ package org.apache.tajo.ha; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.NetUtils; @@ -29,6 +27,8 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.junit.Test; import static junit.framework.TestCase.assertEquals; @@ -37,8 +37,6 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class TestHAServiceHDFSImpl { - private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class); - private TajoTestingCluster cluster; private TajoMaster backupMaster; @@ -60,7 +58,8 @@ public class TestHAServiceHDFSImpl { try { FileSystem fs = cluster.getDefaultFileSystem(); - masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0]; + ServiceTracker serviceTracker = ServiceTrackerFactory.get(conf); + masterAddress = serviceTracker.getUmbilicalAddress().getHostName(); setConfiguration(); @@ -112,6 +111,7 @@ public class TestHAServiceHDFSImpl { masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); //Client API service RPC Server http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java index f00dc25..2264b62 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java @@ -23,12 +23,11 @@ import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.TajoConstants; -import org.apache.tajo.client.CatalogAdminClient; -import org.apache.tajo.client.QueryClient; -import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.jdbc.util.QueryStringDecoder; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.util.NetUtils; import java.io.IOException; import java.net.URI; @@ -109,7 +108,8 @@ public class JdbcConnection implements Connection { } try { - tajoClient = new TajoClientImpl(hostName, port, databaseName); + ServiceTracker serviceTracker = new DummyServiceTracker(NetUtils.createSocketAddr(hostName, port)); + tajoClient = new TajoClientImpl(tajoConf, serviceTracker, databaseName); } catch (Exception e) { throw new SQLException("Cannot create TajoClient instance:" + e.getMessage(), "TAJO-002"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java index eb7f8c9..7f89b46 100644 --- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java +++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java @@ -118,7 +118,7 @@ public class TajoStatement implements Statement { @Override public ResultSet executeQuery(String sql) throws SQLException { if (isClosed) { - throw new SQLFeatureNotSupportedException("Can't execute after statement has been closed"); + throw new SQLException("Can't execute after statement has been closed"); } try { @@ -130,7 +130,7 @@ public class TajoStatement implements Statement { return tajoClient.executeQueryAndGetResult(sql); } } catch (Exception e) { - throw new SQLFeatureNotSupportedException(e.getMessage(), e); + throw new SQLException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java index af3d623..68e96d8 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -38,14 +38,6 @@ import java.util.ArrayList; import java.util.List; public class StorageUtil extends StorageConstants { - public static int getRowByteSize(Schema schema) { - int sum = 0; - for(Column col : schema.getColumns()) { - sum += StorageUtil.getColByteSize(col); - } - - return sum; - } public static int getColByteSize(Column col) { switch (col.getDataType().getType()) { @@ -83,14 +75,6 @@ public class StorageUtil extends StorageConstants { return 0; } } - - public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { - FileSystem fs = tableroot.getFileSystem(conf); - FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); - FileUtil.writeProto(out, meta.getProto()); - out.flush(); - out.close(); - } public static Path concatPath(String parent, String...childs) { return concatPath(new Path(parent), childs);
