Repository: tajo Updated Branches: refs/heads/branch-0.10.1 568386dc8 -> 19554d801
TAJO-1586: TajoMaster HA startup failure on Yarn. (jaehwa) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/19554d80 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/19554d80 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/19554d80 Branch: refs/heads/branch-0.10.1 Commit: 19554d80154735aaf0823884eb6d28061272d06a Parents: 568386d Author: JaeHwa Jung <[email protected]> Authored: Thu May 14 18:49:42 2015 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu May 14 18:49:42 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/cli/tools/TajoAdmin.java | 6 +- .../org/apache/tajo/cli/tools/TajoHAAdmin.java | 13 +- .../apache/tajo/client/DummyServiceTracker.java | 19 +- .../java/org/apache/tajo/conf/TajoConf.java | 2 + .../java/org/apache/tajo/ha/HAConstants.java | 1 + .../java/org/apache/tajo/ha/HAServiceUtil.java | 253 --------------- .../apache/tajo/service/BaseServiceTracker.java | 31 +- .../apache/tajo/service/HAServiceTracker.java | 20 +- .../org/apache/tajo/service/ServiceTracker.java | 28 +- .../java/org/apache/tajo/util/FileUtil.java | 22 ++ .../org/apache/tajo/ha/HdfsServiceTracker.java | 322 +++++++++++-------- .../java/org/apache/tajo/master/TajoMaster.java | 21 +- .../main/java/org/apache/tajo/util/JSPUtil.java | 2 +- .../tajo/worker/TajoResourceAllocator.java | 1 - .../java/org/apache/tajo/worker/TajoWorker.java | 2 + .../resources/webapps/admin/catalogview.jsp | 10 +- .../main/resources/webapps/admin/cluster.jsp | 10 +- .../src/main/resources/webapps/admin/index.jsp | 10 +- .../src/main/resources/webapps/admin/query.jsp | 7 +- .../resources/webapps/admin/query_executor.jsp | 9 +- .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 26 +- 22 files changed, 363 insertions(+), 454 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4f718a9..39e2dfe 100644 --- a/CHANGES +++ b/CHANGES @@ -39,6 +39,8 @@ Release 0.10.1 - unreleased BUG FIXES + TAJO-1586: TajoMaster HA startup failure on Yarn. (jaehwa) + TAJO-1485: Datum 'Char' returned only 1byte. (Contributed by DaeMyung Kang, Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java index 5497435..6738489 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java @@ -30,7 +30,6 @@ import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.TajoIdUtils; import java.io.IOException; @@ -71,8 +70,8 @@ public class TajoAdmin { private TajoConf tajoConf; private TajoClient tajoClient; - private ServiceTracker serviceTracker; private Writer writer; + private ServiceTracker serviceTracker; public TajoAdmin(TajoConf tajoConf, Writer writer) { this(tajoConf, writer, null); @@ -82,6 +81,7 @@ public class TajoAdmin { this.tajoConf = tajoConf; this.writer = writer; this.tajoClient = tajoClient; + serviceTracker = ServiceTrackerFactory.get(this.tajoConf); } private void printUsage() { @@ -427,7 +427,7 @@ public class TajoAdmin { if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - List<String> list = HAServiceUtil.getMasters(tajoConf); + List<String> list = serviceTracker.getMasters(tajoConf); int i = 0; for (String master : list) { if (i > 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java index 127ee8c..84fab33 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java @@ -21,9 +21,8 @@ package org.apache.tajo.cli.tools; import com.google.protobuf.ServiceException; import org.apache.commons.cli.*; import org.apache.tajo.client.TajoClient; -import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import java.io.IOException; @@ -44,8 +43,8 @@ public class TajoHAAdmin { } private TajoConf tajoConf; - private TajoClient tajoClient; private Writer writer; + private ServiceTracker serviceTracker; public TajoHAAdmin(TajoConf tajoConf, Writer writer) { this(tajoConf, writer, null); @@ -54,7 +53,6 @@ public class TajoHAAdmin { public TajoHAAdmin(TajoConf tajoConf, Writer writer, TajoClient tajoClient) { this.tajoConf = tajoConf; this.writer = writer; - this.tajoClient = tajoClient; } private void printUsage() { @@ -127,9 +125,6 @@ public class TajoHAAdmin { return; } else if (hostName != null && port != null) { tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port); - tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); - } else if (hostName == null && port == null) { - tajoClient = new TajoClientImpl(ServiceTrackerFactory.get(tajoConf)); } if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { @@ -160,7 +155,7 @@ public class TajoHAAdmin { private void getState(Writer writer, String param) throws ParseException, IOException, ServiceException { - int retValue = HAServiceUtil.getState(param, tajoConf); + int retValue = serviceTracker.getState(param, tajoConf); switch (retValue) { case 1: @@ -180,7 +175,7 @@ public class TajoHAAdmin { private void formatHA(Writer writer) throws ParseException, IOException, ServiceException { - int retValue = HAServiceUtil.formatHA(tajoConf); + int retValue = serviceTracker.formatHA(tajoConf); switch (retValue) { case 1: http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java index 762c2e7..cf826ea 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java @@ -18,6 +18,7 @@ package org.apache.tajo.client; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerException; @@ -25,6 +26,7 @@ import org.apache.tajo.service.TajoMasterInfo; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.List; public class DummyServiceTracker implements ServiceTracker { @@ -65,6 +67,21 @@ public class DummyServiceTracker implements ServiceTracker { } @Override + public int getState(String masterName, TajoConf conf) throws ServiceTrackerException { + return 0; + } + + @Override + public int formatHA(TajoConf conf) throws ServiceTrackerException { + return 0; + } + + @Override + public List<String> getMasters(TajoConf conf) throws ServiceTrackerException { + return new ArrayList<String>(); + } + + @Override public void register() throws IOException { } @@ -73,7 +90,7 @@ public class DummyServiceTracker implements ServiceTracker { } @Override - public boolean isActiveStatus() { + public boolean isActiveMaster() { return true; } http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 4ed8097..1cc1240 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -138,6 +138,8 @@ public class TajoConf extends Configuration { // High availability configurations TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()), TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec + TAJO_MASTER_HA_CLIENT_RETRY_MAX_NUM("tajo.master.ha.client.read.retry.max-num", 120), // 120 retry + TAJO_MASTER_HA_CLIENT_RETRY_PAUSE_TIME("tajo.master.ha.client.read.pause-time", 500), // 500 ms // Service discovery DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()), http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java index c5f4b8a..7af19c6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java @@ -24,4 +24,5 @@ public class HAConstants { public final static int RESOURCE_TRACKER_RPC_ADDRESS = 3; public final static int CATALOG_ADDRESS = 4; public final static int MASTER_INFO_ADDRESS = 5; + public final static String ACTIVE_LOCK_FILE = "active.lock"; } http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java deleted file mode 100644 index 7001228..0000000 --- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.ha; - -import org.apache.hadoop.fs.*; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.util.NetUtils; - - -import javax.net.SocketFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.List; - -public class HAServiceUtil { - - public static InetSocketAddress getMasterClientAddress(TajoConf conf) { - return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS); - } - - public static String getMasterClientName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf)); - } - - public static InetSocketAddress getMasterAddress(TajoConf conf, int type) { - InetSocketAddress masterAddress = null; - - if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - - if (fs.exists(activePath)) { - FileStatus[] files = fs.listStatus(activePath); - - if (files.length == 1) { - Path file = files[0].getPath(); - String hostAddress = file.getName().replaceAll("_", ":"); - FSDataInputStream stream = fs.open(file); - String data = stream.readUTF(); - stream.close(); - - String[] addresses = data.split("_"); - - switch (type) { - case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: - masterAddress = NetUtils.createSocketAddr(hostAddress); - break; - case HAConstants.MASTER_CLIENT_RPC_ADDRESS: - masterAddress = NetUtils.createSocketAddr(addresses[0]); - break; - case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: - masterAddress = NetUtils.createSocketAddr(addresses[1]); - break; - case HAConstants.CATALOG_ADDRESS: - masterAddress = NetUtils.createSocketAddr(addresses[2]); - break; - case HAConstants.MASTER_INFO_ADDRESS: - masterAddress = NetUtils.createSocketAddr(addresses[3]); - break; - default: - break; - } - } - } - - } catch (Exception e) { - e.printStackTrace(); - } - } - - if (masterAddress == null) { - switch (type) { - case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); - break; - case HAConstants.MASTER_CLIENT_RPC_ADDRESS: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_CLIENT_RPC_ADDRESS)); - break; - case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .RESOURCE_TRACKER_RPC_ADDRESS)); - break; - case HAConstants.CATALOG_ADDRESS: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .CATALOG_ADDRESS)); - break; - case HAConstants.MASTER_INFO_ADDRESS: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_INFO_ADDRESS)); - break; - default: - break; - } - } - - return masterAddress; - } - - public static boolean isMasterAlive(String masterName, TajoConf conf) { - boolean isAlive = true; - - try { - // how to create sockets - SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf); - - int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, - CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); - - InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName); - - // connected socket - Socket socket = socketFactory.createSocket(); - org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout); - } catch (Exception e) { - isAlive = false; - } - return isAlive; - } - - public static int getState(String masterName, TajoConf conf) { - String targetMaster = masterName.replaceAll(":", "_"); - int retValue = -1; - - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - - Path temPath = null; - - // Check backup masters - FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - if (temPath.getName().equals(targetMaster)) { - return 0; - } - } - - // Check active master - files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - if (temPath.getName().equals(targetMaster)) { - return 1; - } - } - retValue = -2; - } catch (Exception e) { - e.printStackTrace(); - } - return retValue; - } - - public static int formatHA(TajoConf conf) { - int retValue = -1; - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - Path temPath = null; - - int aliveMasterCount = 0; - // Check backup masters - FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { - aliveMasterCount++; - } - } - - // Check active master - files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { - aliveMasterCount++; - } - } - - // If there is any alive master, users can't format storage. - if (aliveMasterCount > 0) { - return 0; - } - - // delete ha path. - fs.delete(TajoConf.getSystemHADir(conf), true); - retValue = 1; - } catch (Exception e) { - e.printStackTrace(); - } - return retValue; - } - - - public static List<String> getMasters(TajoConf conf) { - List<String> list = new ArrayList<String>(); - - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - Path temPath = null; - - // Check backup masters - FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - list.add(temPath.getName().replaceAll("_", ":")); - } - - // Check active master - files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - list.add(temPath.getName().replaceAll("_", ":")); - } - - } catch (Exception e) { - e.printStackTrace(); - } - return list; - } - - private static FileSystem getFileSystem(TajoConf conf) throws IOException { - Path rootPath = TajoConf.getTajoRootDir(conf); - return rootPath.getFileSystem(conf); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java index bf7fd2c..e598f2a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java +++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java @@ -77,6 +77,29 @@ public class BaseServiceTracker implements ServiceTracker { } @Override + public int getState(String masterName, TajoConf conf) throws ServiceTrackerException { + String masterAddress = getMasterAddress(); + + if (masterAddress.equals(masterName)) { + return 1; + } else { + return 0; + } + } + + @Override + public int formatHA(TajoConf conf) throws ServiceTrackerException { + throw new ServiceTrackerException("Cannot format HA directories on non-HA mode"); + } + + @Override + public List<String> getMasters(TajoConf conf) throws ServiceTrackerException { + List<String> list = TUtil.newList(); + list.add(getMasterAddress()); + return list; + } + + @Override public void register() throws IOException { } @@ -85,7 +108,7 @@ public class BaseServiceTracker implements ServiceTracker { } @Override - public boolean isActiveStatus() { + public boolean isActiveMaster() { return true; } @@ -94,4 +117,10 @@ public class BaseServiceTracker implements ServiceTracker { return tajoMasterInfos; } + private String getMasterAddress() { + String masterAddress = tajoMasterInfo.getTajoMasterAddress().getAddress().getHostAddress() + ":" + tajoMasterInfo + .getTajoMasterAddress().getPort(); + + return masterAddress; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java index c808537..081b153 100644 --- a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java +++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java @@ -18,13 +18,18 @@ package org.apache.tajo.service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.net.NetUtils; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.FileUtil; import javax.net.SocketFactory; import java.net.InetSocketAddress; import java.net.Socket; public abstract class HAServiceTracker implements ServiceTracker { + private static final Log LOG = LogFactory.getLog(HAServiceTracker.class); static SocketFactory socketFactory = SocketFactory.getDefault(); @@ -32,16 +37,29 @@ public abstract class HAServiceTracker implements ServiceTracker { return true; } + public static boolean checkConnection(String address) { + return checkConnection(address, ":"); + } + + public static boolean checkConnection(String address, String delimiter) { + String[] hostAddress = address.split(delimiter); + InetSocketAddress socketAddress = new InetSocketAddress(hostAddress[0], Integer.parseInt(hostAddress[1])); + return checkConnection(socketAddress); + } + public static boolean checkConnection(InetSocketAddress address) { boolean isAlive = true; + Socket socket = null; try { int connectionTimeout = 10; - Socket socket = socketFactory.createSocket(); + socket = socketFactory.createSocket(); NetUtils.connect(socket, address, connectionTimeout); } catch (Exception e) { isAlive = false; + } finally { + FileUtil.cleanup(LOG, socket); } return isAlive; } http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java index 73ff112..5888ff3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java +++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java @@ -18,46 +18,54 @@ package org.apache.tajo.service; +import org.apache.tajo.conf.TajoConf; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; public interface ServiceTracker { - public abstract boolean isHighAvailable(); + boolean isHighAvailable(); + + InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException; + + InetSocketAddress getClientServiceAddress() throws ServiceTrackerException; + + InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException; - public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException; + InetSocketAddress getCatalogAddress() throws ServiceTrackerException; - public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException; + InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException; - public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException; + int getState(String masterName, TajoConf conf) throws ServiceTrackerException; - public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException; + int formatHA(TajoConf conf) throws ServiceTrackerException; - public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException; + List<String> getMasters(TajoConf conf) throws ServiceTrackerException; /** * Add master name to shared storage. */ - public void register() throws IOException; + void register() throws IOException; /** * Delete master name to shared storage. * */ - public void delete() throws IOException; + void delete() throws IOException; /** * * @return True if current master is an active master. */ - public boolean isActiveStatus(); + boolean isActiveMaster(); /** * * @return return all master list * @throws IOException */ - public List<TajoMasterInfo> getMasters() throws IOException; + List<TajoMasterInfo> getMasters() throws IOException; } http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java index 9aa6af9..0f17926 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java @@ -19,6 +19,7 @@ package org.apache.tajo.util; import com.google.protobuf.Message; +import org.apache.commons.logging.Log; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; @@ -130,4 +131,25 @@ public class FileUtil { public static boolean isLocalPath(Path path) { return path.toUri().getScheme().equals("file"); } + + /** + * Close the Closeable objects and <b>ignore</b> any {@link IOException} or + * null pointers. Must only be used for cleanup in exception handlers. + * + * @param log the log to record problems to at debug level. Can be null. + * @param closeables the objects to close + */ + public static void cleanup(Log log, java.io.Closeable... closeables) { + for (java.io.Closeable c : closeables) { + if (c != null) { + try { + c.close(); + } catch(IOException e) { + if (log != null && log.isDebugEnabled()) { + log.debug("Exception in closing " + c, e); + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java index 4a782ec..5f1aff8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -31,7 +32,8 @@ import org.apache.tajo.master.TajoMaster; import org.apache.tajo.service.HAServiceTracker; import org.apache.tajo.service.ServiceTrackerException; import org.apache.tajo.service.TajoMasterInfo; -import org.apache.tajo.util.TUtil; +import org.apache.tajo.util.*; +import org.apache.tajo.util.FileUtil; import javax.net.SocketFactory; import java.io.IOException; @@ -58,7 +60,7 @@ public class HdfsServiceTracker extends HAServiceTracker { private Path activePath; private Path backupPath; - private boolean isActiveStatus = false; + private boolean isActiveMaster = false; //thread which runs periodically to see the last time since a heartbeat is received. private Thread checkerThread; @@ -74,8 +76,7 @@ public class HdfsServiceTracker extends HAServiceTracker { InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort(); - - monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); + this.monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); } private void initSystemDirectory() throws IOException { @@ -113,87 +114,144 @@ public class HdfsServiceTracker extends HAServiceTracker { } } + /** + * It will creates the following form string. It includes + * + * <pre> + * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT} + * </pre> + * + * @throws IOException + */ @Override public void register() throws IOException { - FileStatus[] files = fs.listStatus(activePath); + // Check lock file + boolean lockResult = createLockFile(); + + String fileName = masterName.replaceAll(":", "_"); + Path activeFile = new Path(activePath, fileName); + Path backupFile = new Path(backupPath, fileName); + + // Set TajoMasterInfo object which has several rpc server addresses. + StringBuilder sb = new StringBuilder(); + InetSocketAddress address = getHostAddress(HAConstants.MASTER_UMBILICAL_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.CATALOG_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); // Phase 1: If there is not another active master, this try to become active master. - if (files.length == 0) { - createMasterFile(true); + if (lockResult) { + fs.delete(backupFile, false); + createMasterFile(activeFile, sb); currentActiveMaster = masterName; + writeSystemConf(); LOG.info(String.format("This is added to active master (%s)", masterName)); } else { // Phase 2: If there is active master information, we need to check its status. - Path activePath = files[0].getPath(); - currentActiveMaster = activePath.getName().replaceAll("_", ":"); + FileStatus[] files = fs.listStatus(activePath); + Path existingActiveFile = null; + if (files.length > 2) { + throw new ServiceTrackerException("Three or more than active master entries."); + } + for(FileStatus eachFile : files) { + if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) { + existingActiveFile = eachFile.getPath(); + } + } + currentActiveMaster = existingActiveFile.getName().replaceAll("_", ":"); // Phase 3: If current active master is dead, this master should be active master. - if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) { - fs.delete(activePath, true); - createMasterFile(true); + if (!checkConnection(currentActiveMaster)) { + fs.delete(existingActiveFile, false); + fs.delete(backupFile, false); + createMasterFile(activeFile, sb); currentActiveMaster = masterName; LOG.info(String.format("This is added to active master (%s)", masterName)); } else { // Phase 4: If current active master is alive, this master need to be backup master. - createMasterFile(false); - LOG.info(String.format("This is added to backup masters (%s)", masterName)); + if (masterName.equals(currentActiveMaster)) { + LOG.info(String.format("This has already been added to active master (%s)", masterName)); + } else { + if (fs.exists(backupFile)) { + LOG.info(String.format("This has already been added to backup masters (%s)", masterName)); + } else { + createMasterFile(backupFile, sb); + LOG.info(String.format("This is added to backup master (%s)", masterName)); + } + } } } + startPingChecker(); } /** - * It will creates the following form string. It includes + * Storing the system configs * - * <pre> - * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT} - * </pre> - * - * @param isActive A boolean flag to indicate if it is for master or not. * @throws IOException */ - private void createMasterFile(boolean isActive) throws IOException { - String fileName = masterName.replaceAll(":", "_"); - Path path = null; + private void writeSystemConf() throws IOException { + Path systemConfPath = TajoConf.getSystemConfPath(conf); - if (isActive) { - path = new Path(activePath, fileName); - } else { - path = new Path(backupPath, fileName); + FSDataOutputStream out = FileSystem.create(fs, systemConfPath, + new FsPermission(TajoMaster.SYSTEM_CONF_FILE_PERMISSION)); + try { + conf.writeXml(out); + } finally { + out.close(); } + fs.setReplication(systemConfPath, (short) conf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT)); + } - StringBuilder sb = new StringBuilder(); - InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.CATALOG_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + private boolean createLockFile() throws IOException { + boolean result = false; + FSDataOutputStream lockOutput = null; - address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); + Path lockFile = new Path(activePath, HAConstants.ACTIVE_LOCK_FILE); + try { + lockOutput = fs.create(lockFile, false); + lockOutput.hsync(); + lockOutput.close(); + fs.deleteOnExit(lockFile); + result = true; + } catch (FileAlreadyExistsException e) { + LOG.info(String.format("Lock file already exists at (%s)", lockFile.toString())); + result = false; + } catch (Exception e) { + throw new IOException("Lock file creation is failed - " + e.getMessage()); + } finally { + FileUtil.cleanup(LOG, lockOutput); + } - FSDataOutputStream out = fs.create(path); + return result; + } + private void createMasterFile(Path path, StringBuilder sb) throws IOException { + FSDataOutputStream out = null; try { + out = fs.create(path, false); + out.writeUTF(sb.toString()); out.hsync(); out.close(); - } catch (FileAlreadyExistsException e) { - createMasterFile(false); - } - if (isActive) { - isActiveStatus = true; - } else { - isActiveStatus = false; + fs.deleteOnExit(path); + } catch (Exception e) { + throw new IOException("File creation is failed - " + e.getMessage()); + } finally { + FileUtil.cleanup(LOG, out); } - - startPingChecker(); } - private InetSocketAddress getHostAddress(int type) { InetSocketAddress address = null; @@ -226,65 +284,61 @@ public class HdfsServiceTracker extends HAServiceTracker { @Override public void delete() throws IOException { + if (ShutdownHookManager.get().isShutdownInProgress()) return; + String fileName = masterName.replaceAll(":", "_"); - Path activeFile = new Path(activePath, fileName); - if (fs.exists(activeFile)) { - fs.delete(activeFile, true); - } + fs.delete(new Path(activePath, fileName), false); + fs.delete(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE), false); + fs.delete(new Path(backupPath, fileName), false); - Path backupFile = new Path(backupPath, fileName); - if (fs.exists(backupFile)) { - fs.delete(backupFile, true); - } - if (isActiveStatus) { - isActiveStatus = false; - } stopped = true; } @Override - public boolean isActiveStatus() { - return isActiveStatus; + public boolean isActiveMaster() { + if (currentActiveMaster.equals(masterName)) { + return true; + } else { + return false; + } } @Override public List<TajoMasterInfo> getMasters() throws IOException { List<TajoMasterInfo> list = TUtil.newList(); - Path path = null; FileStatus[] files = fs.listStatus(activePath); - if (files.length == 1) { - path = files[0].getPath(); - list.add(createTajoMasterInfo(path, true)); + for(FileStatus status : files) { + if (!status.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) { + list.add(getTajoMasterInfo(status.getPath(), true)); + } } files = fs.listStatus(backupPath); for (FileStatus status : files) { - path = status.getPath(); - list.add(createTajoMasterInfo(path, false)); + list.add(getTajoMasterInfo(status.getPath(), false)); } return list; } - private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException { + private TajoMasterInfo getTajoMasterInfo(Path path, boolean isActive) throws IOException { String masterAddress = path.getName().replaceAll("_", ":"); - boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf); + boolean isAlive = checkConnection(masterAddress); FSDataInputStream stream = fs.open(path); String data = stream.readUTF(); - stream.close(); String[] addresses = data.split("_"); TajoMasterInfo info = new TajoMasterInfo(); - info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress)); - info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0])); - info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1])); - info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2])); - info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3])); + info.setTajoMasterAddress(NetUtils.createSocketAddr(addresses[0])); + info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[1])); + info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[2])); + info.setCatalogAddress(NetUtils.createSocketAddr(addresses[3])); + info.setWebServerAddress(NetUtils.createSocketAddr(addresses[4])); info.setAvailable(isAlive); info.setActive(isActive); @@ -299,21 +353,18 @@ public class HdfsServiceTracker extends HAServiceTracker { synchronized (HdfsServiceTracker.this) { try { if (!currentActiveMaster.equals(masterName)) { - boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); if (LOG.isDebugEnabled()) { - LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName - + ", isAlive:" + isAlive); + LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName); } // If active master is dead, this master should be active master instead of // previous active master. - if (!isAlive) { - FileStatus[] files = fs.listStatus(activePath); - if (files.length == 0 || (files.length == 1 - && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { - delete(); - register(); - } + if (!checkConnection(currentActiveMaster)) { + Path activeFile = new Path(activePath, currentActiveMaster.replaceAll(":", "_")); + fs.delete(activeFile, false); + Path lockFile = new Path(activePath, HAConstants.ACTIVE_LOCK_FILE); + fs.delete(lockFile, false); + register(); } } } catch (Exception e) { @@ -345,7 +396,7 @@ public class HdfsServiceTracker extends HAServiceTracker { @Override public InetSocketAddress getUmbilicalAddress() { if (!checkConnection(umbilicalRpcAddr)) { - umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS)); + umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_UMBILICAL_RPC_ADDRESS)); } return umbilicalRpcAddr; @@ -354,7 +405,7 @@ public class HdfsServiceTracker extends HAServiceTracker { @Override public InetSocketAddress getClientServiceAddress() { if (!checkConnection(clientRpcAddr)) { - clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS)); + clientRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_CLIENT_RPC_ADDRESS)); } return clientRpcAddr; @@ -363,7 +414,7 @@ public class HdfsServiceTracker extends HAServiceTracker { @Override public InetSocketAddress getResourceTrackerAddress() { if (!checkConnection(resourceTrackerRpcAddr)) { - resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS)); + resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements().get(RESOURCE_TRACKER_RPC_ADDRESS)); } return resourceTrackerRpcAddr; @@ -372,7 +423,7 @@ public class HdfsServiceTracker extends HAServiceTracker { @Override public InetSocketAddress getCatalogAddress() { if (!checkConnection(catalogAddr)) { - catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS)); + catalogAddr = NetUtils.createSocketAddr(getAddressElements().get(CATALOG_ADDRESS)); } return catalogAddr; @@ -381,7 +432,7 @@ public class HdfsServiceTracker extends HAServiceTracker { @Override public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException { if (!checkConnection(masterHttpInfoAddr)) { - masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO)); + masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements().get(MASTER_HTTP_INFO)); } return masterHttpInfoAddr; @@ -390,11 +441,10 @@ public class HdfsServiceTracker extends HAServiceTracker { /** * Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file. * * - * @param conf * @return all service addresses * @throws ServiceTrackerException */ - private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException { + private synchronized List<String> getAddressElements() throws ServiceTrackerException { try { FileSystem fs = getFileSystem(conf); @@ -408,15 +458,34 @@ public class HdfsServiceTracker extends HAServiceTracker { } FileStatus[] files = fs.listStatus(activeMasterBaseDir); + /* wait for active master from HDFS */ + int pause = conf.getIntVar(ConfVars.TAJO_MASTER_HA_CLIENT_RETRY_PAUSE_TIME); + int maxRetry = conf.getIntVar(ConfVars.TAJO_MASTER_HA_CLIENT_RETRY_MAX_NUM); + int retry = 0; + + while (files.length < 2 && retry < maxRetry) { + try { + this.wait(pause); + } catch (InterruptedException e) { + throw new ServiceTrackerException(e); + } + files = fs.listStatus(activeMasterBaseDir); + } if (files.length < 1) { + LOG.error("Exceeded the maximum retry (" + maxRetry + ") to read TajoMaster address from HDFS"); throw new ServiceTrackerException("No active master entry"); - } else if (files.length > 1) { - throw new ServiceTrackerException("Two or more than active master entries."); + } else if (files.length > 2) { + throw new ServiceTrackerException("Three or more than active master entries."); } - // We can ensure that there is only one file due to the above assertion. - Path activeMasterEntry = files[0].getPath(); + Path activeMasterEntry = null; + + for (FileStatus eachFile : files) { + if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE)) { + activeMasterEntry = eachFile.getPath(); + } + } if (!fs.isFile(activeMasterEntry)) { throw new ServiceTrackerException("Active master entry must be a file, but it is a directory."); @@ -424,12 +493,9 @@ public class HdfsServiceTracker extends HAServiceTracker { List<String> addressElements = TUtil.newList(); - addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements - FSDataInputStream stream = fs.open(activeMasterEntry); String data = stream.readUTF(); stream.close(); - addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements // ensure the number of entries @@ -442,33 +508,8 @@ public class HdfsServiceTracker extends HAServiceTracker { } } - - public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) { - return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf); - } - - public static boolean isMasterAlive(String masterName, TajoConf conf) { - boolean isAlive = true; - - try { - // how to create sockets - SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf); - - int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, - CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); - - InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName); - - // connected socket - Socket socket = socketFactory.createSocket(); - org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout); - } catch (Exception e) { - isAlive = false; - } - return isAlive; - } - - public static int getState(String masterName, TajoConf conf) { + @Override + public int getState(String masterName, TajoConf conf) throws ServiceTrackerException { String targetMaster = masterName.replaceAll(":", "_"); int retValue = -1; @@ -498,12 +539,13 @@ public class HdfsServiceTracker extends HAServiceTracker { } retValue = -2; } catch (Exception e) { - e.printStackTrace(); + throw new ServiceTrackerException("Cannot get HA state - ERROR:" + e.getMessage()); } return retValue; } - public static int formatHA(TajoConf conf) { + @Override + public int formatHA(TajoConf conf) throws ServiceTrackerException{ int retValue = -1; try { FileSystem fs = getFileSystem(conf); @@ -512,20 +554,20 @@ public class HdfsServiceTracker extends HAServiceTracker { Path temPath = null; int aliveMasterCount = 0; + // Check backup masters FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { + for (FileStatus eachFile : files) { + if (checkConnection(eachFile.getPath().getName(), "_")) { aliveMasterCount++; } } // Check active master files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { + for (FileStatus eachFile : files) { + if (!eachFile.getPath().getName().equals(HAConstants.ACTIVE_LOCK_FILE) && + checkConnection(eachFile.getPath().getName(), "_")) { aliveMasterCount++; } } @@ -539,13 +581,13 @@ public class HdfsServiceTracker extends HAServiceTracker { fs.delete(TajoConf.getSystemHADir(conf), true); retValue = 1; } catch (Exception e) { - e.printStackTrace(); + throw new ServiceTrackerException("Cannot format HA directories - ERROR:" + e.getMessage()); } return retValue; } - - public static List<String> getMasters(TajoConf conf) { + @Override + public List<String> getMasters(TajoConf conf) throws ServiceTrackerException { List<String> list = new ArrayList<String>(); try { @@ -569,7 +611,7 @@ public class HdfsServiceTracker extends HAServiceTracker { } } catch (Exception e) { - e.printStackTrace(); + throw new ServiceTrackerException("Cannot get master lists - ERROR:" + e.getMessage()); } return list; } @@ -578,4 +620,4 @@ public class HdfsServiceTracker extends HAServiceTracker { Path rootPath = TajoConf.getTajoRootDir(conf); return rootPath.getFileSystem(conf); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 371dfb4..e6d22fe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -228,11 +228,6 @@ public class TajoMaster extends CompositeService { } } - public boolean isActiveMaster() { - return (haService != null ? haService.isActiveStatus() : true); - } - - private void checkAndInitializeSystemDirectories() throws IOException { // Get Tajo root dir this.tajoRootPath = TajoConf.getTajoRootDir(systemConf); @@ -342,14 +337,18 @@ public class TajoMaster extends CompositeService { defaultFS.delete(systemConfPath, false); } - FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath, + // In TajoMaster HA, some master might see LeaseExpiredException because of lease mismatch. Thus, + // we need to create below xml file at HdfsServiceTracker::writeSystemConf. + if (!systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + FSDataOutputStream out = FileSystem.create(defaultFS, systemConfPath, new FsPermission(SYSTEM_CONF_FILE_PERMISSION)); - try { - systemConf.writeXml(out); - } finally { - out.close(); + try { + systemConf.writeXml(out); + } finally { + out.close(); + } + defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT)); } - defaultFS.setReplication(systemConfPath, (short) systemConf.getIntVar(ConfVars.SYSTEM_CONF_REPLICA_COUNT)); } private void checkBaseTBSpaceAndDatabase() throws IOException { http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index aee2ced..578b15a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -197,7 +197,7 @@ public class JSPUtil { ServiceTracker haService = context.getHAService(); String activeLabel = ""; if (haService != null) { - if (haService.isActiveStatus()) { + if (haService.isActiveMaster()) { activeLabel = "<font color='#1e90ff'>(active)</font>"; } else { activeLabel = "<font color='#1e90ff'>(backup)</font>"; http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 49cb1e9..1ef01a8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 3c55add..e806def 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -35,6 +35,7 @@ import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerException; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.service.TajoMasterInfo; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; @@ -358,6 +359,7 @@ public class TajoWorker extends CompositeService { startJvmPauseMonitor(); tajoMasterInfo = new TajoMasterInfo(); + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { tajoMasterInfo.setTajoMasterAddress(serviceTracker.getUmbilicalAddress()); tajoMasterInfo.setWorkerResourceTrackerAddr(serviceTracker.getResourceTrackerAddress()); http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/resources/webapps/admin/catalogview.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp index 1ff81a6..3455d0b 100644 --- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp +++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp @@ -30,9 +30,13 @@ <%@ page import="java.util.Collection" %> <%@ page import="java.util.List" %> <%@ page import="java.util.Map" %> -<%@ page import="org.apache.tajo.service.ServiceTracker" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + + String[] masterName = master.getMasterName().split(":"); + InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); + String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); + CatalogService catalog = master.getCatalog(); String catalogType = request.getParameter("type"); @@ -63,7 +67,7 @@ ServiceTracker haService = master.getContext().getHAService(); String activeLabel = ""; if (haService != null) { - if (haService.isActiveStatus()) { + if (haService.isActiveMaster()) { activeLabel = "<font color='#1e90ff'>(active)</font>"; } else { activeLabel = "<font color='#1e90ff'>(backup)</font>"; @@ -81,7 +85,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2> <hr/> <h3>Catalog</h3> <div> http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index aca1153..6a618b0 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -31,9 +31,15 @@ <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.service.ServiceTracker" %> +<%@ page import="java.net.InetSocketAddress" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + + String[] masterName = master.getMasterName().split(":"); + InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); + String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); + Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); List<Integer> wokerKeys = new ArrayList<Integer>(workers.keySet()); Collections.sort(wokerKeys); @@ -81,7 +87,7 @@ String activeLabel = ""; if (haService != null) { - if (haService.isActiveStatus()) { + if (haService.isActiveMaster()) { activeLabel = "<font color='#1e90ff'>(active)</font>"; } else { activeLabel = "<font color='#1e90ff'>(backup)</font>"; @@ -114,7 +120,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2> <div>Live:<%=numLiveMasters%>, Dead: <%=deadMasterHtml%>, Total: <%=masters.size()%></div> <% if (masters != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 0a0558e..aa7917d 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -35,10 +35,16 @@ <%@ page import="java.util.Collection" %> <%@ page import="java.util.Date" %> <%@ page import="java.util.Map" %> +<%@ page import="java.net.InetSocketAddress" %> <%@ page import="org.apache.tajo.service.ServiceTracker" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + + String[] masterName = master.getMasterName().split(":"); + InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); + String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); + Map<Integer, Worker> workers = master.getContext().getResourceManager().getWorkers(); Map<Integer, Worker> inactiveWorkers = master.getContext().getResourceManager().getInactiveWorkers(); @@ -91,7 +97,7 @@ String activeLabel = ""; if (haService != null) { - if (haService.isActiveStatus()) { + if (haService.isActiveMaster()) { activeLabel = "<font color='#1e90ff'>(active)</font>"; } else { activeLabel = "<font color='#1e90ff'>(backup)</font>"; @@ -122,7 +128,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2> <hr/> <h3>Master Status</h3> <table border='0'> http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 85f7176..894b9d0 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -29,10 +29,15 @@ <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.master.QueryInfo" %> +<%@ page import="java.net.InetSocketAddress" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + String[] masterName = master.getMasterName().split(":"); + InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); + String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); + List<QueryInProgress> runningQueries = new ArrayList<QueryInProgress>(master.getContext().getQueryJobManager().getSubmittedQueries()); @@ -111,7 +116,7 @@ <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=JSPUtil.getMasterActiveLabel(master.getContext())%></h2> <hr/> <h3>Running Queries</h3> <% http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/main/resources/webapps/admin/query_executor.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp index a0f9a0a..1a58583 100644 --- a/tajo-core/src/main/resources/webapps/admin/query_executor.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query_executor.jsp @@ -22,14 +22,19 @@ <%@ page import="org.apache.tajo.service.ServiceTracker" %> <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %> <%@ page import="javax.xml.ws.Service" %> +<%@ page import="java.net.InetSocketAddress" %> <% TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object"); + String[] masterName = master.getMasterName().split(":"); + InetSocketAddress socketAddress = new InetSocketAddress(masterName[0], Integer.parseInt(masterName[1])); + String masterLabel = socketAddress.getAddress().getHostName()+ ":" + socketAddress.getPort(); + ServiceTracker haService = master.getContext().getHAService(); String activeLabel = ""; if (haService != null) { - if (haService.isActiveStatus()) { + if (haService.isActiveMaster()) { activeLabel = "<font color='#1e90ff'>(active)</font>"; } else { activeLabel = "<font color='#1e90ff'>(backup)</font>"; @@ -288,7 +293,7 @@ function getPage() { <body> <%@ include file="header.jsp"%> <div class='contents'> - <h2>Tajo Master: <%=master.getMasterName()%> <%=activeLabel%></h2> + <h2>Tajo Master: <%=masterLabel%> <%=activeLabel%></h2> <hr/> <h3>Query</h3> Database : http://git-wip-us.apache.org/repos/asf/tajo/blob/19554d80/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index 7c91e22..c8ddc03 100644 --- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -30,6 +30,7 @@ import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.junit.Test; +import static junit.framework.Assert.assertTrue; import static junit.framework.TestCase.assertEquals; import static org.junit.Assert.*; @@ -68,12 +69,12 @@ public class TestHAServiceHDFSImpl { verifySystemDirectories(fs); - Path backupMasterFile = new Path(backupPath, backupMaster.getMasterName() - .replaceAll(":", "_")); - assertTrue(fs.exists(backupMasterFile)); + assertEquals(2, fs.listStatus(activePath).length); + assertEquals(1, fs.listStatus(backupPath).length); - assertTrue(cluster.getMaster().isActiveMaster()); - assertFalse(backupMaster.isActiveMaster()); + assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE))); + assertTrue(fs.exists(new Path(activePath, cluster.getMaster().getMasterName().replaceAll(":", "_")))); + assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_")))); createDatabaseAndTable(); verifyDataBaseAndTable(); @@ -81,13 +82,14 @@ public class TestHAServiceHDFSImpl { cluster.getMaster().stop(); - Thread.sleep(7000); - - assertFalse(cluster.getMaster().isActiveMaster()); - assertTrue(backupMaster.isActiveMaster()); - client = cluster.newTajoClient(); verifyDataBaseAndTable(); + + assertEquals(2, fs.listStatus(activePath).length); + assertEquals(0, fs.listStatus(backupPath).length); + + assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE))); + assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_")))); } finally { client.close(); backupMaster.stop(); @@ -110,6 +112,7 @@ public class TestHAServiceHDFSImpl { masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); + conf.setIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL, 1000); //Client API service RPC Server conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); @@ -132,9 +135,6 @@ public class TestHAServiceHDFSImpl { backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); assertTrue(fs.exists(backupPath)); - - assertEquals(1, fs.listStatus(activePath).length); - assertEquals(1, fs.listStatus(backupPath).length); } private void createDatabaseAndTable() throws Exception {
