Repository: tajo Updated Branches: refs/heads/master 35c24927b -> b16d13add
http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java new file mode 100644 index 0000000..26bc97b --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.ha; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.net.NetUtils; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.util.List; + +/** + * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster. + * + */ +public class HAServiceHDFSImpl implements HAService { + private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class); + + private MasterContext context; + private TajoConf conf; + + private FileSystem fs; + + private String masterName; + private Path rootPath; + private Path haPath; + private Path activePath; + private Path backupPath; + + private boolean isActiveStatus = false; + + //thread which runs periodically to see the last time since a heartbeat is received. + private Thread checkerThread; + private volatile boolean stopped = false; + + private int monitorInterval; + + private String currentActiveMaster; + + public HAServiceHDFSImpl(MasterContext context) throws IOException { + this.context = context; + this.conf = context.getConf(); + initSystemDirectory(); + this.masterName = conf.get(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS.varname); + monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); + } + + private void initSystemDirectory() throws IOException { + // Get Tajo root dir + this.rootPath = TajoConf.getTajoRootDir(conf); + + // Check Tajo root dir + this.fs = rootPath.getFileSystem(conf); + + // Check and create Tajo system HA dir + haPath = TajoConf.getSystemHADir(conf); + if (!fs.exists(haPath)) { + fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA dir '" + haPath + "' is created"); + } + + activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + if (!fs.exists(activePath)) { + fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA Active dir '" + activePath + "' is created"); + } + + backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + if (!fs.exists(backupPath)) { + fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA Backup dir '" + backupPath + "' is created"); + } + } + + private void startPingChecker() { + if (checkerThread == null) { + checkerThread = new Thread(new PingChecker()); + checkerThread.setName("Ping Checker"); + checkerThread.start(); + } + } + + @Override + public void register() throws IOException { + FileStatus[] files = fs.listStatus(activePath); + + // Phase 1: If there is not another active master, this try to become active master. + if (files.length == 0) { + createMasterFile(true); + currentActiveMaster = masterName; + LOG.info(String.format("This is added to active master (%s)", masterName)); + } else { + // Phase 2: If there is active master information, we need to check its status. + Path activePath = files[0].getPath(); + currentActiveMaster = activePath.getName().replaceAll("_", ":"); + + // Phase 3: If current active master is dead, this master should be active master. + if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) { + fs.delete(activePath, true); + createMasterFile(true); + currentActiveMaster = masterName; + LOG.info(String.format("This is added to active master (%s)", masterName)); + } else { + // Phase 4: If current active master is alive, this master need to be backup master. + createMasterFile(false); + LOG.info(String.format("This is added to backup masters (%s)", masterName)); + } + } + } + + private void createMasterFile(boolean isActive) throws IOException { + String hostName = masterName.split(":")[0]; + String fileName = masterName.replaceAll(":", "_"); + Path path = null; + + if (isActive) { + path = new Path(activePath, fileName); + } else { + path = new Path(backupPath, fileName); + } + + StringBuilder sb = new StringBuilder(); + sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname, + hostName + ":26002")); + sb.append("_"); + sb.append(context.getConf().get(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS.varname, + hostName + ":26003")); + sb.append("_"); + sb.append(context.getConf().get(TajoConf.ConfVars.CATALOG_ADDRESS.varname, hostName + ":26005")); + sb.append("_"); + sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS.varname, + hostName + ":26080")); + + FSDataOutputStream out = fs.create(path); + out.writeUTF(sb.toString()); + out.close(); + + if (isActive) { + isActiveStatus = true; + } else { + isActiveStatus = false; + } + + startPingChecker(); + } + + + @Override + public void delete() throws IOException { + String fileName = masterName.replaceAll(":", "_"); + + Path activeFile = new Path(activePath, fileName); + if (fs.exists(activeFile)) { + fs.delete(activeFile, true); + } + + Path backupFile = new Path(backupPath, fileName); + if (fs.exists(backupFile)) { + fs.delete(backupFile, true); + } + if (isActiveStatus) { + isActiveStatus = false; + } + stopped = true; + } + + @Override + public boolean isActiveStatus() { + return isActiveStatus; + } + + @Override + public List<TajoMasterInfo> getMasters() throws IOException { + List<TajoMasterInfo> list = TUtil.newList(); + boolean isAlive = false; + TajoMasterInfo info = null; + String hostAddress = null; + Path path = null; + + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 1) { + path = files[0].getPath(); + list.add(createTajoMasterInfo(path, true)); + } + + files = fs.listStatus(backupPath); + for (FileStatus status : files) { + path = status.getPath(); + list.add(createTajoMasterInfo(path, false)); + } + + return list; + } + + private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException { + String masterAddress = path.getName().replaceAll("_", ":"); + boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf); + + FSDataInputStream stream = fs.open(path); + String data = stream.readUTF(); + + stream.close(); + + String[] addresses = data.split("_"); + TajoMasterInfo info = new TajoMasterInfo(); + + info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress)); + info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0])); + info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1])); + info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2])); + info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3])); + + info.setAvailable(isAlive); + info.setActive(isActive); + + return info; + } + + private class PingChecker implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (HAServiceHDFSImpl.this) { + try { + boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); + if (LOG.isDebugEnabled()) { + LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName + + ", isAlive:" + isAlive); + } + + // If active master is dead, this master should be active master instead of + // previous active master. + if (!isAlive) { + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 0 || (files.length == 1 + && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { + delete(); + register(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + LOG.info("PingChecker interrupted. - masterName:" + masterName); + break; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java new file mode 100644 index 0000000..6ed975a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/ha/TajoMasterInfo.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.ha; + +import java.net.InetSocketAddress; + +public class TajoMasterInfo { + + private boolean available; + private boolean isActive; + + private InetSocketAddress tajoMasterAddress; + private InetSocketAddress tajoClientAddress; + private InetSocketAddress workerResourceTrackerAddr; + private InetSocketAddress catalogAddress; + private InetSocketAddress webServerAddress; + + public InetSocketAddress getTajoMasterAddress() { + return tajoMasterAddress; + } + + public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { + this.tajoMasterAddress = tajoMasterAddress; + } + + public InetSocketAddress getTajoClientAddress() { + return tajoClientAddress; + } + + public void setTajoClientAddress(InetSocketAddress tajoClientAddress) { + this.tajoClientAddress = tajoClientAddress; + } + + public InetSocketAddress getWorkerResourceTrackerAddr() { + return workerResourceTrackerAddr; + } + + public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { + this.workerResourceTrackerAddr = workerResourceTrackerAddr; + } + + public InetSocketAddress getCatalogAddress() { + return catalogAddress; + } + + public void setCatalogAddress(InetSocketAddress catalogAddress) { + this.catalogAddress = catalogAddress; + } + + public InetSocketAddress getWebServerAddress() { + return webServerAddress; + } + + public void setWebServerAddress(InetSocketAddress webServerAddress) { + this.webServerAddress = webServerAddress; + } + + public boolean isAvailable() { + return available; + } + + public void setAvailable(boolean available) { + this.available = available; + } + + public boolean isActive() { + return isActive; + } + + public void setActive(boolean active) { + isActive = active; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 88589f9..6a980de 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -46,6 +46,7 @@ import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.StorageManagerFactory; +import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; @@ -228,8 +229,27 @@ public class QueryMaster extends CompositeService implements EventHandler { NettyClientBase rpc = null; try { - rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub(); CallFuture<TajoMasterProtocol.WorkerResourcesRequest> callBack = @@ -251,8 +271,27 @@ public class QueryMaster extends CompositeService implements EventHandler { LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId); NettyClientBase tmClient = null; try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder() @@ -365,8 +404,25 @@ public class QueryMaster extends CompositeService implements EventHandler { NettyClientBase tmClient = null; try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); } catch (Exception e) { @@ -453,8 +509,27 @@ public class QueryMaster extends CompositeService implements EventHandler { for(QueryMasterTask eachTask: tempTasks) { NettyClientBase tmClient; try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); CallFuture<TajoHeartbeatResponse> callBack = http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 1889a56..ce329fb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -58,6 +58,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.AbstractStorageManager; +import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; import org.apache.tajo.worker.AbstractResourceAllocator; @@ -191,8 +192,27 @@ public class QueryMasterTask extends CompositeService { RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); NettyClientBase tmClient = null; try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( + HAServiceUtil.getResourceTrackerAddress(systemConf)); + queryMasterContext.getWorkerContext().setTajoMasterAddress( + HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); masterClientService.stopQueryMaster(null, queryId.getProto(), future); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index 8b849ca..7ea2e48 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -11,9 +11,11 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryStatus; import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoHAClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.JSPUtil; import org.apache.tajo.util.TajoIdUtils; import org.codehaus.jackson.map.DeserializationConfig; @@ -271,6 +273,9 @@ public class QueryExecutorServlet extends HttpServlet { public void run() { startTime = System.currentTimeMillis(); try { + TajoConf conf = tajoClient.getConf(); + tajoClient = TajoHAClientUtil.getTajoClient(conf, tajoClient); + response = tajoClient.executeQuery(query); if (response == null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index e09a69e..c7e513d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -49,6 +49,7 @@ import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.util.ApplicationIdUtils; +import org.apache.tajo.util.HAServiceUtil; import java.util.ArrayList; import java.util.Collection; @@ -237,9 +238,31 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf()); NettyClientBase tmClient = null; try { - tmClient = connPool.getConnection( - queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + tmClient = connPool.getConnection( + queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } catch (Exception e) { + queryTaskContext.getQueryMasterContext().getWorkerContext(). + setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf)); + queryTaskContext.getQueryMasterContext().getWorkerContext(). + setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf)); + tmClient = connPool.getConnection( + queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + } else { + tmClient = connPool.getConnection( + queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), + TajoMasterProtocol.class, true); + } + TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index ed78e49..736cf51 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -35,6 +35,7 @@ import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.master.ha.TajoMasterInfo; import org.apache.tajo.master.querymaster.QueryMaster; import org.apache.tajo.master.querymaster.QueryMasterManagerService; import org.apache.tajo.master.rm.TajoWorkerResourceManager; @@ -42,10 +43,7 @@ import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.util.*; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.StaticHttpServer; @@ -83,9 +81,7 @@ public class TajoWorker extends CompositeService { private TajoWorkerManagerService tajoWorkerManagerService; - private InetSocketAddress tajoMasterAddress; - - private InetSocketAddress workerResourceTrackerAddr; + private TajoMasterInfo tajoMasterInfo; private CatalogClient catalogClient; @@ -238,8 +234,9 @@ public class TajoWorker extends CompositeService { super.init(conf); + tajoMasterInfo = new TajoMasterInfo(); if(yarnContainerMode && queryMasterMode) { - tajoMasterAddress = NetUtils.createSocketAddr(cmdArgs[2]); + tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(cmdArgs[2])); connectToCatalog(); QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]); @@ -248,8 +245,15 @@ public class TajoWorker extends CompositeService { } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode taskRunnerManager.startTask(cmdArgs); } else { - tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); - workerResourceTrackerAddr = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS)); + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + } else { + tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars + .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); + tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars + .RESOURCE_TRACKER_RPC_ADDRESS))); + } connectToCatalog(); } @@ -449,11 +453,19 @@ public class TajoWorker extends CompositeService { } public InetSocketAddress getTajoMasterAddress() { - return tajoMasterAddress; + return tajoMasterInfo.getTajoMasterAddress(); + } + + public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { + tajoMasterInfo.setTajoMasterAddress(tajoMasterAddress); } public InetSocketAddress getResourceTrackerAddress() { - return workerResourceTrackerAddr; + return tajoMasterInfo.getWorkerResourceTrackerAddr(); + } + + public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { + tajoMasterInfo.setWorkerResourceTrackerAddr(workerResourceTrackerAddr); } public int getPeerRpcPort() { http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 007bcbf..5ab5b5d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -34,6 +34,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.v2.DiskDeviceInfo; import org.apache.tajo.storage.v2.DiskMountInfo; import org.apache.tajo.storage.v2.DiskUtil; +import org.apache.tajo.util.HAServiceUtil; import java.io.File; import java.util.ArrayList; @@ -217,7 +218,22 @@ public class WorkerHeartbeatService extends AbstractService { CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack = new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>(); - rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); + // In TajoMaster HA mode, if backup master be active status, + // worker may fail to connect existing active master. Thus, + // if worker can't connect the master, worker should try to connect another master and + // update master address in worker context. + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); + } catch (Exception e) { + context.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); + context.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); + } + } else { + rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true); + } + TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/resources/webapps/admin/catalogview.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp index fe6dc73..8f1d1bc 100644 --- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp +++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp @@ -24,6 +24,7 @@ <%@ page import="org.apache.tajo.catalog.TableDesc" %> <%@ page import="org.apache.tajo.catalog.partition.PartitionMethodDesc" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.master.ha.HAService" %> <%@ page import="org.apache.tajo.util.FileUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.Collection" %> @@ -57,6 +58,16 @@ //TODO filter with database Collection<String> tableNames = catalog.getAllTableNames(selectedDatabase); + + HAService haService = master.getContext().getHAService(); + String activeLabel = ""; + if (haService != null) { + if (haService.isActiveStatus()) { + activeLabel = "<font color='#1e90ff'>(active)</font>"; + } else { + activeLabel = "<font color='#1e90ff'>(backup)</font>"; + } + } %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -69,7 +80,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%></h2> + <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> <hr/> <h3>Catalog</h3> <div> http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index f454c66..0317759 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -19,13 +19,16 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> -<%@ page import="java.util.*" %> -<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> -<%@ page import="org.apache.tajo.master.*" %> -<%@ page import="org.apache.tajo.master.rm.*" %> +<%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.master.ha.HAService" %> +<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %> +<%@ page import="org.apache.tajo.master.rm.Worker" %> +<%@ page import="org.apache.tajo.master.rm.WorkerResource" %> +<%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.*" %> +<%@ page import="org.apache.tajo.util.TUtil" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); @@ -70,6 +73,33 @@ String deadWorkersHtml = deadWorkers.isEmpty() ? "0": "<font color='red'>" + deadWorkers.size() + "</font>"; String deadQueryMastersHtml = deadQueryMasters.isEmpty() ? "0": "<font color='red'>" + deadQueryMasters.size() + "</font>"; + + HAService haService = master.getContext().getHAService(); + List<TajoMasterInfo> masters = TUtil.newList(); + + String activeLabel = ""; + if (haService != null) { + if (haService.isActiveStatus()) { + activeLabel = "<font color='#1e90ff'>(active)</font>"; + } else { + activeLabel = "<font color='#1e90ff'>(backup)</font>"; + } + + masters.addAll(haService.getMasters()); + } + + int numLiveMasters = 0; + int numDeadMasters = 0; + + for(TajoMasterInfo eachMaster : masters) { + if (eachMaster.isAvailable()) { + numLiveMasters++; + } else { + numDeadMasters++; + } + } + String deadMasterHtml = numDeadMasters == 0 ? "0": "<font color='red'>" + numDeadMasters +"</font>"; + %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -82,7 +112,54 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%></h2> + <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> + <div>Live:<%=numLiveMasters%>, Dead: <%=deadMasterHtml%>, Total: <%=masters.size()%></div> +<% + if (masters != null) { + if(numLiveMasters == 0) { + out.write("No TajoMasters\n"); + } else { +%> + <p/> + <table width="100%" class="border_table" border="1"> + <tr><th>No</th><th>TajoMaster</th><th>Rpc Server</th><th>Rpc Client</th><th>ResourceTracker</th> + <th>Catalog</th><th>Active/Backup</th><th>Status</th></tr> + <% + int no = 1; + + for(TajoMasterInfo eachMaster : masters) { + String tajoMasterHttp = "http://" + eachMaster.getWebServerAddress().getHostName() + ":" + + eachMaster.getWebServerAddress().getPort() + "/index.jsp"; + String isActive = eachMaster.isActive() == true ? "ACTIVE" : "BACKUP"; + String isAvailable = eachMaster.isAvailable() == true ? "RUNNING" : "FAILED"; + %> + <tr> + <td width='30' align='right'><%=no++%></td> + <td><a href='<%=tajoMasterHttp%>'><%=eachMaster.getWebServerAddress().getHostName() + ":" + + eachMaster.getWebServerAddress().getPort()%></a></td> + <td width='200' align='right'><%=eachMaster.getTajoMasterAddress().getHostName() + ":" + + eachMaster.getTajoMasterAddress().getPort()%></td> + <td width='200' align='right'><%=eachMaster.getTajoClientAddress().getHostName() + ":" + + eachMaster.getTajoClientAddress().getPort()%></td> + <td width='200' align='right'><%=eachMaster.getWorkerResourceTrackerAddr().getHostName() + ":" + + eachMaster.getWorkerResourceTrackerAddr().getPort()%></td> + <td width='200' align='right'><%=eachMaster.getCatalogAddress().getHostName() + ":" + + eachMaster.getCatalogAddress().getPort()%></td> + <td width='200' align='right'><%=isActive%></td> + <td width='100' align='center'><%=isAvailable%></td> + </tr> + <% + } //end fo for + %> + </table> + <% + } //end of if + %> + <p/> +<% + } //end of if +%> + <hr/> <h2>Query Master</h2> <div>Live:<%=liveQueryMasters.size()%>, Dead: <%=deadQueryMastersHtml%>, QueryMaster Tasks: <%=runningQueryMasterTasks%></div> http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index ebd017d..7ab1482 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -23,11 +23,15 @@ <%@ page import="org.apache.tajo.conf.TajoConf" %> <%@ page import="org.apache.tajo.ipc.TajoMasterProtocol" %> <%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.master.ha.HAService" %> +<%@ page import="org.apache.tajo.master.ha.TajoMasterInfo" %> <%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.util.NetUtils" %> +<%@ page import="org.apache.tajo.util.TUtil" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> +<%@ page import="java.util.List" %> <%@ page import="java.util.Collection" %> <%@ page import="java.util.Date" %> <%@ page import="java.util.Map" %> @@ -111,6 +115,32 @@ if(finishedQueries.size() > 0) { avgQueryTime = (int)(totalTime / (long)finishedQueries.size()); } + + + HAService haService = master.getContext().getHAService(); + List<TajoMasterInfo> masters = TUtil.newList(); + + String activeLabel = ""; + if (haService != null) { + if (haService.isActiveStatus()) { + activeLabel = "<font color='#1e90ff'>(active)</font>"; + } else { + activeLabel = "<font color='#1e90ff'>(backup)</font>"; + } + + masters.addAll(haService.getMasters()); + } + + int numLiveMasters = 0; + int numDeadMasters = 0; + + for(TajoMasterInfo eachMaster : masters) { + if (eachMaster.isAvailable()) { + numLiveMasters++; + } else { + numDeadMasters++; + } + } %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -123,7 +153,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%></h2> + <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> <hr/> <h3>Master Status</h3> <table border='0'> @@ -164,6 +194,21 @@ <td align='center'><%=clusterResourceSummary.getTotalMemoryMB() - clusterResourceSummary.getTotalAvailableMemoryMB()%>/<%=clusterResourceSummary.getTotalMemoryMB()%></td> <td align='center'><%=clusterResourceSummary.getTotalDiskSlots() - clusterResourceSummary.getTotalAvailableDiskSlots()%>/<%=clusterResourceSummary.getTotalDiskSlots()%></td> </tr> +<% + if (haService != null) { +%> + <tr> + <td><a href='cluster.jsp'>Masters</a></td> + <td align='right'><%=haService.getMasters().size()%></td> + <td align='right'><%=numLiveMasters%></td> + <td align='right'><%=numDeadMasters%></td> + <td align='right'>-</td> + <td align='center'>-</td> + <td align='center'>-</td> + </tr> +<% + } +%> </table> <p/> <hr/> http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index fecc806..e7b402f 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -20,6 +20,7 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.master.ha.HAService" %> <%@ page import="org.apache.tajo.master.querymaster.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> @@ -55,6 +56,16 @@ portMap.put(queryMaster.getHostName(), queryMaster.getHttpPort()); } } + + HAService haService = master.getContext().getHAService(); + String activeLabel = ""; + if (haService != null) { + if (haService.isActiveStatus()) { + activeLabel = "<font color='#1e90ff'>(active)</font>"; + } else { + activeLabel = "<font color='#1e90ff'>(backup)</font>"; + } + } %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -89,7 +100,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%></h2> + <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> <hr/> <h3>Running Queries</h3> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/main/resources/webapps/admin/query_executor.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp index 736b202..85debe5 100644 --- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp @@ -19,10 +19,21 @@ %> <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> +<%@ page import="org.apache.tajo.master.ha.HAService" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + + HAService haService = master.getContext().getHAService(); + String activeLabel = ""; + if (haService != null) { + if (haService.isActiveStatus()) { + activeLabel = "<font color='#1e90ff'>(active)</font>"; + } else { + activeLabel = "<font color='#1e90ff'>(backup)</font>"; + } + } %> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> @@ -275,7 +286,7 @@ function getPage() { <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%></h2> + <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> <hr/> <h3>Query</h3> <textarea id="query" style="width:800px; height:250px; font-family:Tahoma; font-size:12px;"></textarea> http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 38462bb..948f018 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -85,9 +85,14 @@ public class TajoTestingCluster { public Boolean isHCatalogStoreUse = false; public TajoTestingCluster() { + this(false); + } + + public TajoTestingCluster(boolean masterHaEMode) { this.conf = new TajoConf(); + this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode); initPropertiesAndConfigs(); - } + } void initPropertiesAndConfigs() { if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java new file mode 100644 index 0000000..86d18eb --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.ha; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.net.NetUtils; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +public class TestHAServiceHDFSImpl { + private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class); + + private TajoTestingCluster cluster; + private TajoMaster backupMaster1, backupMaster2; + + private TajoConf conf; + private TajoClient client; + private Path testDir; + + private Path haPath, activePath, backupPath; + + private static final String LOCAL_HOST = "localhost:"; + + @Test + public final void testTwoBackupMasters() throws Exception { + cluster = new TajoTestingCluster(true); + cluster.startMiniCluster(1); + + conf = cluster.getConfiguration(); + client = new TajoClient(conf); + + FileSystem fs = cluster.getDefaultFileSystem(); + startBackupMasters(); + + verifyMasterAddress(); + verifySystemDirectories(fs); + + Path backupMasterFile1 = new Path(backupPath, backupMaster1.getMasterName() + .replaceAll(":", "_")); + assertTrue(fs.exists(backupMasterFile1)); + + Path backupMasterFile2 = new Path(backupPath, backupMaster2.getMasterName() + .replaceAll(":", "_")); + assertTrue(fs.exists(backupMasterFile2)); + + assertTrue(cluster.getMaster().isActiveMaster()); + assertFalse(backupMaster1.isActiveMaster()); + assertFalse(backupMaster2.isActiveMaster()); + } + + private void startBackupMasters() throws Exception { + + conf = cluster.getConfiguration(); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); + + backupMaster1 = new TajoMaster(); + backupMaster1.init(conf); + backupMaster1.start(); + + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, + LOCAL_HOST + NetUtils.getFreeSocketPort()); + conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); + + backupMaster2 = new TajoMaster(); + backupMaster2.init(conf); + backupMaster2.start(); + } + + private void verifyMasterAddress() { + assertNotEquals(cluster.getMaster().getMasterName(), + backupMaster1.getMasterName()); + assertNotEquals(cluster.getMaster().getMasterName(), + backupMaster2.getMasterName()); + assertNotEquals(backupMaster1.getMasterName(), + backupMaster2.getMasterName()); + } + + private void verifySystemDirectories(FileSystem fs) throws Exception { + haPath = TajoConf.getSystemHADir(cluster.getConfiguration()); + assertTrue(fs.exists(haPath)); + + activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + assertTrue(fs.exists(activePath)); + + backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + assertTrue(fs.exists(backupPath)); + + assertEquals(1, fs.listStatus(activePath).length); + assertEquals(2, fs.listStatus(backupPath).length); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-dist/src/main/bin/start-tajo.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/start-tajo.sh b/tajo-dist/src/main/bin/start-tajo.sh index 3e11c39..63b7488 100755 --- a/tajo-dist/src/main/bin/start-tajo.sh +++ b/tajo-dist/src/main/bin/start-tajo.sh @@ -25,7 +25,20 @@ bin=`cd "$bin"; pwd` . "$bin"/tajo-config.sh # start the tajo master daemon -"$bin"/tajo-daemon.sh --config $TAJO_CONF_DIR start master +AUTOHA_ENABLED=$("$bin"/tajo getconf tajo.master.ha.enable) + +if [ "$AUTOHA_ENABLED" = "true" ]; then + echo "Starting TajoMasters on HA mode" + if [ -f "${TAJO_CONF_DIR}/masters" ]; then + MASTER_FILE=${TAJO_CONF_DIR}/masters + MASTER_NAMES=$(cat "$MASTER_FILE" | sed 's/#.*$//;/^$/d') + "$bin/tajo-daemons.sh" --hosts masters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start master + fi +else + echo "Starting single TajoMaster" + "$bin"/tajo-daemon.sh --config $TAJO_CONF_DIR start master +fi + if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then . "${TAJO_CONF_DIR}/tajo-env.sh" http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-dist/src/main/bin/stop-tajo.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/stop-tajo.sh b/tajo-dist/src/main/bin/stop-tajo.sh index cbabadb..f50ae3a 100755 --- a/tajo-dist/src/main/bin/stop-tajo.sh +++ b/tajo-dist/src/main/bin/stop-tajo.sh @@ -24,7 +24,21 @@ bin=`cd "$bin"; pwd` . "$bin"/tajo-config.sh -"$bin"/tajo-daemon.sh --config $TAJO_CONF_DIR stop master +# stop the tajo master daemon +AUTOHA_ENABLED=$("$bin"/tajo getconf tajo.master.ha.enable) + +if [ "$AUTOHA_ENABLED" = "true" ]; then + echo "Stopping TajoMasters on HA mode" + if [ -f "${TAJO_CONF_DIR}/masters" ]; then + MASTER_FILE=${TAJO_CONF_DIR}/masters + MASTER_NAMES=$(cat "$MASTER_FILE" | sed 's/#.*$//;/^$/d') + "$bin/tajo-daemons.sh" --hosts masters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop master + fi +else + echo "Stopping single TajoMaster" + "$bin"/tajo-daemon.sh --config $TAJO_CONF_DIR stop master +fi + if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then . "${TAJO_CONF_DIR}/tajo-env.sh" http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-dist/src/main/bin/tajo ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo index b6acdd8..4382a7a 100755 --- a/tajo-dist/src/main/bin/tajo +++ b/tajo-dist/src/main/bin/tajo @@ -68,6 +68,7 @@ if [ $# = 0 ]; then echo " catutil catalog utility" echo " cli run the tajo cli" echo " admin run the tajo admin util" + echo " haadmin run the tajo master HA admin util" echo " getconf print tajo configuration" echo " jar <jar> run a jar file" echo " benchmark run the benchmark driver" @@ -349,6 +350,10 @@ elif [ "$COMMAND" = "admin" ] ; then CLASS='org.apache.tajo.client.TajoAdmin' TAJO_ROOT_LOGGER_APPENDER="${TAJO_ROOT_LOGGER_APPENDER:-NullAppender}" TAJO_OPTS="$TAJO_OPTS $TAJO_CLI_OPTS" +elif [ "$COMMAND" = "haadmin" ] ; then + CLASS='org.apache.tajo.client.TajoHAAdmin' + TAJO_ROOT_LOGGER_APPENDER="${TAJO_ROOT_LOGGER_APPENDER:-NullAppender}" + TAJO_OPTS="$TAJO_OPTS $TAJO_CLI_OPTS" elif [ "$COMMAND" = "getconf" ] ; then CLASS='org.apache.tajo.client.TajoGetConf' TAJO_ROOT_LOGGER_APPENDER="${TAJO_ROOT_LOGGER_APPENDER:-NullAppender}" http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-dist/src/main/bin/tajo-daemons.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/tajo-daemons.sh b/tajo-dist/src/main/bin/tajo-daemons.sh index 90c7a1f..6c6f05e 100755 --- a/tajo-dist/src/main/bin/tajo-daemons.sh +++ b/tajo-dist/src/main/bin/tajo-daemons.sh @@ -65,4 +65,5 @@ for slave in `cat "$HOSTLIST"`; do fi done + wait http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-docs/src/main/sphinx/configuration.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/configuration.rst b/tajo-docs/src/main/sphinx/configuration.rst index 8f5c355..f898969 100644 --- a/tajo-docs/src/main/sphinx/configuration.rst +++ b/tajo-docs/src/main/sphinx/configuration.rst @@ -10,4 +10,5 @@ Configuration configuration/tajo_master_configuration configuration/worker_configuration configuration/catalog_configuration - configuration/configuration_defaults \ No newline at end of file + configuration/configuration_defaults + configuration/ha_configuration http://git-wip-us.apache.org/repos/asf/tajo/blob/b16d13ad/tajo-docs/src/main/sphinx/configuration/ha_configuration.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/configuration/ha_configuration.rst b/tajo-docs/src/main/sphinx/configuration/ha_configuration.rst new file mode 100644 index 0000000..0eaa674 --- /dev/null +++ b/tajo-docs/src/main/sphinx/configuration/ha_configuration.rst @@ -0,0 +1,135 @@ +********************************* +High Availability for TajoMaster +********************************* + +TajoMaster is a Single Point of Failure in a Tajo Cluster because TajoMaster is the central controlling entity for all components of the Tajo system. TajoMaster failure prevents clients from submitting new queries to the cluster, and results in the disruption of the ability to run insert overwrite queries because the TajoWorker canât apply its statistical information to CatalogStore. Therefore, the high-availability (HA) of TajoMaster is essential for the high-availability of Tajo generally. + +Currently, TajoMaster HA provides the following elements: + +* Automatic failover of TajoMaster: Even if the active TajoMaster stops, the standby TajoMaster will become the active node. +* Preservation of the ongoing query in the cluster: Even if the active TajoMaster stops, the ongoing query will still complete in the cluster. + + +================================================ + Terminology +================================================ + +* Active master: TajoMaster that is actively serving the all operation from TajoClient and TajoWorker. +* Backup master: This TajoMaster waits becomes active when the Active dies or unhealthy. Users can setup multiple back TajoMaster, and this servers monitors the Active status to become active. + + +================================================ + Configuration File Settings +================================================ + +If you want to use TajoMaster HA mode, specific your ``tajo.master.ha.enable`` as follows: + +.. code-block:: xml + + <property> + <name>tajo.master.ha.enable</name> + <value>true</value> + </property> + +If you use HA mode, all back masters monitor the active master at 5 second intervals. If you update this period, specific your ``tajo.master.ha.monitor.interval`` as follows: + +.. code-block:: xml + + <property> + <name>tajo.master.ha.monitor.interval</name> + <value>monitor interval</value> + </property> + + +================================================ + Backup Master Settings +================================================ + +If you want to run masters with ``start-tajo.sh``, specific your masters in ``conf/masters``. The file lists all host names of masters, one per line.By default, this file contains the single entry ``localhost``. You can easily add host names of workers via your favorite text editor. + +For example: :: + + $ cat > conf/masters + host1.domain.com + host2.domain.com + .... + + <ctrl + d> + +And then, you need to setup tarball and set configuration files on backup masters. + +.. note:: + + If you want to run active master and backup master on the same host, you may find tajo master port conflicts. To avoid this problem, you must convert backup master primary ports to another port in ``tajo-site.xml`` as follows: + + .. code-block:: xml + + <property> + <name>tajo.master.umbilical-rpc.address</name> + <value>localhost:36001</value> + <description>The default port is 26001.</description> + </property> + + <property> + <name>tajo.master.client-rpc.address</name> + <value>localhost:36002</value> + <description>The default port is 26002.</description> + </property> + + <property> + <name>tajo.resource-tracker.rpc.address</name> + <value>localhost:36003</value> + <description>The default port is 26003.</description> + </property> + + <property> + <name>tajo.catalog.client-rpc.address</name> + <value>localhost:36005</value> + <description>The default port is 26005.</description> + </property> + + <property> + <name>tajo.master.info-http.address</name> + <value>0.0.0.0:36080</value> + <description>The default port is 26080.</description> + </property> + + + And you need to convert ``TAJO_PID_DIR`` to another directory in ``tajo-env.sh``. + + +================================================ + Launch a Tajo cluster +================================================ + +Then, execute ``start-tajo.sh`` :: + + $ $TAJO_HOME/bin/start-tajo.sh + +.. note:: + + You can't use HA mode in DerbyStore. Currently, just one tajo master invoke the derby. If another master try to invoke it, it never run itself. Also, if you set another catalog uri for backup master, it is a incorrect configuration. Because they are unequal in every way. + +================================================ + Administration HA state +================================================ + +If you want to transit any backup master to active master, execute ``tajo hadmin -transitionToActive`` :: + + $ $TAJO_HOME/bin/tajo haadmin -transitionToActive <target tajo.master.umbilical-rpc.address> + +If you want to transit any active master to backup master, execute ``tajo hadmin -transitionToBackup`` :: + + $ $TAJO_HOME/bin/tajo haadmin -transitionToBackup <target tajo.master.umbilical-rpc.address> + +If you want to find the state of any master, execute ``tajo hadmin -getState`` :: + + $ $TAJO_HOME/bin/tajo haadmin -getState <target tajo.master.umbilical-rpc.address> + +If you want to initiate HA information, execute ``tajo haadmin -formatHA`` :: + + $ $TAJO_HOME/bin/tajo haadmin -formatHA + +.. note:: + + Before format HA, you must shutdown the tajo cluster. \ No newline at end of file
