Repository: tajo Updated Branches: refs/heads/master e025e3cbe -> 2548768cf
TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa) Closes #286 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2548768c Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2548768c Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2548768c Branch: refs/heads/master Commit: 2548768cfed8f018727f6f054317b2adb4d7f485 Parents: e025e3c Author: JaeHwa Jung <[email protected]> Authored: Thu Dec 18 11:41:13 2014 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Thu Dec 18 11:43:06 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../tajo/catalog/AbstractCatalogClient.java | 2 +- .../org/apache/tajo/cli/tools/TajoAdmin.java | 2 +- .../org/apache/tajo/cli/tools/TajoHAAdmin.java | 2 +- .../apache/tajo/client/SessionConnection.java | 2 +- .../org/apache/tajo/client/TajoClientImpl.java | 5 +- .../apache/tajo/client/TajoHAClientUtil.java | 20 +- .../java/org/apache/tajo/ha/HAConstants.java | 27 ++ .../java/org/apache/tajo/ha/HAServiceUtil.java | 292 ++++++++++++++++++ .../org/apache/tajo/util/HAServiceUtil.java | 293 ------------------- .../apache/tajo/master/TajoContainerProxy.java | 2 +- .../tajo/master/ha/HAServiceHDFSImpl.java | 100 +++++-- .../master/querymaster/QueryJobManager.java | 5 +- .../tajo/master/querymaster/QueryMaster.java | 2 +- .../master/querymaster/QueryMasterTask.java | 2 +- .../tajo/worker/TajoResourceAllocator.java | 2 +- .../java/org/apache/tajo/worker/TajoWorker.java | 1 + .../tajo/worker/WorkerHeartbeatService.java | 2 +- .../ConnectivityCheckerRuleForTajoWorker.java | 2 +- .../org/apache/tajo/TajoTestingCluster.java | 3 + .../tajo/master/ha/TestHAServiceHDFSImpl.java | 115 +++++--- 21 files changed, 494 insertions(+), 389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 65492d2..de63285 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,8 @@ Release 0.9.1 - unreleased IMPROVEMENT + TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa) + TAJO-1241: Change default client and table time zone behavior. (hyunsik) TAJO-1243: *-site.xml.template should have default configs commented out. http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index dde6980..6b50115 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -33,7 +33,7 @@ import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.ProtoUtil; import java.net.InetSocketAddress; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java index 5ef8d76..88b8e0f 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java @@ -28,7 +28,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.TajoIdUtils; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java index 12d94ad..ef8fee9 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tools/TajoHAAdmin.java @@ -24,7 +24,7 @@ import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.client.TajoHAClientUtil; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 44b772b..db2bd2a 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -33,7 +33,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index dff8d65..8eafc91 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -33,6 +33,7 @@ import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; @@ -56,11 +57,11 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que CatalogAdminClient catalogClient; public TajoClientImpl(TajoConf conf) throws IOException { - this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null); + this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null); } public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException { - this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase); + this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase); } public TajoClientImpl(InetSocketAddress addr) throws IOException { http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java index b95fb35..12a9ec8 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java @@ -39,12 +39,13 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.util.NetUtils; import java.io.IOException; +import java.net.InetSocketAddress; public class TajoHAClientUtil { - /** * In TajoMaster HA mode, if TajoCli can't connect existing active master, * this should try to connect new active master. @@ -65,11 +66,11 @@ public class TajoHAClientUtil { if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { if (!HAServiceUtil.isMasterAlive(conf.getVar( - TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) { + TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) { TajoClient tajoClient = null; String baseDatabase = client.getBaseDatabase(); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, - HAServiceUtil.getMasterClientName(conf)); + HAServiceUtil.getMasterClientName(conf)); client.close(); tajoClient = new TajoClientImpl(conf, baseDatabase); @@ -84,4 +85,15 @@ public class TajoHAClientUtil { return client; } } + + + public static InetSocketAddress getRpcClientAddress(TajoConf conf) { + if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf)); + } else { + return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars + .TAJO_MASTER_CLIENT_RPC_ADDRESS)); + } + } + } http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java new file mode 100644 index 0000000..c5f4b8a --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAConstants.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +public class HAConstants { + public final static int MASTER_UMBILICAL_RPC_ADDRESS = 1; + public final static int MASTER_CLIENT_RPC_ADDRESS = 2; + public final static int RESOURCE_TRACKER_RPC_ADDRESS = 3; + public final static int CATALOG_ADDRESS = 4; + public final static int MASTER_INFO_ADDRESS = 5; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java new file mode 100644 index 0000000..b62d73b --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.NetUtils; + + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +public class HAServiceUtil { + private static Log LOG = LogFactory.getLog(HAServiceUtil.class); + + public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) { + return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS); + } + + public static String getMasterUmbilicalName(TajoConf conf) { + return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf)); + } + + public static InetSocketAddress getMasterClientAddress(TajoConf conf) { + return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS); + } + + public static String getMasterClientName(TajoConf conf) { + return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf)); + } + + public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) { + return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); + } + + public static String getResourceTrackerName(TajoConf conf) { + return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf)); + } + + public static InetSocketAddress getCatalogAddress(TajoConf conf) { + return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS); + } + + public static String getCatalogName(TajoConf conf) { + return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf)); + } + + public static InetSocketAddress getMasterInfoAddress(TajoConf conf) { + return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS); + } + + public static String getMasterInfoName(TajoConf conf) { + return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf)); + } + + public static InetSocketAddress getMasterAddress(TajoConf conf, int type) { + InetSocketAddress masterAddress = null; + + if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + + if (fs.exists(activePath)) { + FileStatus[] files = fs.listStatus(activePath); + + if (files.length == 1) { + Path file = files[0].getPath(); + String hostAddress = file.getName().replaceAll("_", ":"); + FSDataInputStream stream = fs.open(file); + String data = stream.readUTF(); + stream.close(); + + String[] addresses = data.split("_"); + + switch (type) { + case 1: + masterAddress = NetUtils.createSocketAddr(hostAddress); + break; + case 2: + masterAddress = NetUtils.createSocketAddr(addresses[0]); + break; + case 3: + masterAddress = NetUtils.createSocketAddr(addresses[1]); + break; + case 4: + masterAddress = NetUtils.createSocketAddr(addresses[2]); + break; + case 5: + masterAddress = NetUtils.createSocketAddr(addresses[3]); + break; + default: + break; + } + } + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + if (masterAddress == null) { + switch (type) { + case 1: + masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars + .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); + break; + case 2: + masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars + .TAJO_MASTER_CLIENT_RPC_ADDRESS)); + break; + case 3: + masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars + .RESOURCE_TRACKER_RPC_ADDRESS)); + break; + case 4: + masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars + .CATALOG_ADDRESS)); + break; + case 5: + masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars + .TAJO_MASTER_INFO_ADDRESS)); + break; + default: + break; + } + } + + return masterAddress; + } + + public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) { + return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf); + } + + public static boolean isMasterAlive(String masterName, TajoConf conf) { + boolean isAlive = true; + + try { + // how to create sockets + SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf); + + int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, + CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); + + InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName); + + // connected socket + Socket socket = socketFactory.createSocket(); + org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout); + } catch (Exception e) { + isAlive = false; + } + return isAlive; + } + + public static int getState(String masterName, TajoConf conf) { + String targetMaster = masterName.replaceAll(":", "_"); + int retValue = -1; + + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + + Path temPath = null; + + // Check backup masters + FileStatus[] files = fs.listStatus(backupPath); + for (FileStatus status : files) { + temPath = status.getPath(); + if (temPath.getName().equals(targetMaster)) { + return 0; + } + } + + // Check active master + files = fs.listStatus(activePath); + if (files.length == 1) { + temPath = files[0].getPath(); + if (temPath.getName().equals(targetMaster)) { + return 1; + } + } + retValue = -2; + } catch (Exception e) { + e.printStackTrace(); + } + return retValue; + } + + public static int formatHA(TajoConf conf) { + int retValue = -1; + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + Path temPath = null; + + int aliveMasterCount = 0; + // Check backup masters + FileStatus[] files = fs.listStatus(backupPath); + for (FileStatus status : files) { + temPath = status.getPath(); + if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { + aliveMasterCount++; + } + } + + // Check active master + files = fs.listStatus(activePath); + if (files.length == 1) { + temPath = files[0].getPath(); + if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { + aliveMasterCount++; + } + } + + // If there is any alive master, users can't format storage. + if (aliveMasterCount > 0) { + return 0; + } + + // delete ha path. + fs.delete(TajoConf.getSystemHADir(conf), true); + retValue = 1; + } catch (Exception e) { + e.printStackTrace(); + } + return retValue; + } + + + public static List<String> getMasters(TajoConf conf) { + List<String> list = new ArrayList<String>(); + + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + Path temPath = null; + + // Check backup masters + FileStatus[] files = fs.listStatus(backupPath); + for (FileStatus status : files) { + temPath = status.getPath(); + list.add(temPath.getName().replaceAll("_", ":")); + } + + // Check active master + files = fs.listStatus(activePath); + if (files.length == 1) { + temPath = files[0].getPath(); + list.add(temPath.getName().replaceAll("_", ":")); + } + + } catch (Exception e) { + e.printStackTrace(); + } + return list; + } + + private static FileSystem getFileSystem(TajoConf conf) throws IOException { + Path rootPath = TajoConf.getTajoRootDir(conf); + return rootPath.getFileSystem(conf); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java deleted file mode 100644 index 4f03113..0000000 --- a/tajo-common/src/main/java/org/apache/tajo/util/HAServiceUtil.java +++ /dev/null @@ -1,293 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.util; - -import org.apache.hadoop.fs.*; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.conf.TajoConf; - -import javax.net.SocketFactory; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.ArrayList; -import java.util.List; - -public class HAServiceUtil { - - private final static int MASTER_UMBILICAL_RPC_ADDRESS = 1; - private final static int MASTER_CLIENT_RPC_ADDRESS = 2; - private final static int RESOURCE_TRACKER_RPC_ADDRESS = 3; - private final static int CATALOG_ADDRESS = 4; - private final static int MASTER_INFO_ADDRESS = 5; - - public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) { - return getMasterAddress(conf, MASTER_UMBILICAL_RPC_ADDRESS); - } - - public static String getMasterUmbilicalName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf)); - } - - public static InetSocketAddress getMasterClientAddress(TajoConf conf) { - return getMasterAddress(conf, MASTER_CLIENT_RPC_ADDRESS); - } - - public static String getMasterClientName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf)); - } - - public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) { - return getMasterAddress(conf, RESOURCE_TRACKER_RPC_ADDRESS); - } - - public static String getResourceTrackerName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf)); - } - - public static InetSocketAddress getCatalogAddress(TajoConf conf) { - return getMasterAddress(conf, CATALOG_ADDRESS); - } - - public static String getCatalogName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf)); - } - - public static InetSocketAddress getMasterInfoAddress(TajoConf conf) { - return getMasterAddress(conf, MASTER_INFO_ADDRESS); - } - - public static String getMasterInfoName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf)); - } - - public static InetSocketAddress getMasterAddress(TajoConf conf, int type) { - InetSocketAddress masterAddress = null; - - if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - - if (fs.exists(activePath)) { - FileStatus[] files = fs.listStatus(activePath); - - if (files.length == 1) { - Path file = files[0].getPath(); - String hostAddress = file.getName().replaceAll("_", ":"); - FSDataInputStream stream = fs.open(file); - String data = stream.readUTF(); - stream.close(); - - String[] addresses = data.split("_"); - - switch (type) { - case 1: - masterAddress = NetUtils.createSocketAddr(hostAddress); - break; - case 2: - masterAddress = NetUtils.createSocketAddr(addresses[0]); - break; - case 3: - masterAddress = NetUtils.createSocketAddr(addresses[1]); - break; - case 4: - masterAddress = NetUtils.createSocketAddr(addresses[2]); - break; - case 5: - masterAddress = NetUtils.createSocketAddr(addresses[3]); - break; - default: - break; - } - } - } - - } catch (Exception e) { - e.printStackTrace(); - } - } - - if (masterAddress == null) { - switch (type) { - case 1: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); - break; - case 2: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_CLIENT_RPC_ADDRESS)); - break; - case 3: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .RESOURCE_TRACKER_RPC_ADDRESS)); - break; - case 4: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .CATALOG_ADDRESS)); - break; - case 5: - masterAddress = NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_INFO_ADDRESS)); - break; - default: - break; - } - } - - return masterAddress; - } - - public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) { - return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf); - } - - public static boolean isMasterAlive(String masterName, TajoConf conf) { - boolean isAlive = true; - - try { - // how to create sockets - SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf); - - int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, - CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); - - InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName); - - // connected socket - Socket socket = socketFactory.createSocket(); - org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout); - } catch (Exception e) { - isAlive = false; - } - return isAlive; - } - - public static int getState(String masterName, TajoConf conf) { - String targetMaster = masterName.replaceAll(":", "_"); - int retValue = -1; - - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - - Path temPath = null; - - // Check backup masters - FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - if (temPath.getName().equals(targetMaster)) { - return 0; - } - } - - // Check active master - files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - if (temPath.getName().equals(targetMaster)) { - return 1; - } - } - retValue = -2; - } catch (Exception e) { - e.printStackTrace(); - } - return retValue; - } - - public static int formatHA(TajoConf conf) { - int retValue = -1; - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - Path temPath = null; - - int aliveMasterCount = 0; - // Check backup masters - FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { - aliveMasterCount++; - } - } - - // Check active master - files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { - aliveMasterCount++; - } - } - - // If there is any alive master, users can't format storage. - if (aliveMasterCount > 0) { - return 0; - } - - // delete ha path. - fs.delete(TajoConf.getSystemHADir(conf), true); - retValue = 1; - } catch (Exception e) { - e.printStackTrace(); - } - return retValue; - } - - - public static List<String> getMasters(TajoConf conf) { - List<String> list = new ArrayList<String>(); - - try { - FileSystem fs = getFileSystem(conf); - Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - Path temPath = null; - - // Check backup masters - FileStatus[] files = fs.listStatus(backupPath); - for (FileStatus status : files) { - temPath = status.getPath(); - list.add(temPath.getName().replaceAll("_", ":")); - } - - // Check active master - files = fs.listStatus(activePath); - if (files.length == 1) { - temPath = files[0].getPath(); - list.add(temPath.getName().replaceAll("_", ":")); - } - - } catch (Exception e) { - e.printStackTrace(); - } - return list; - } - - private static FileSystem getFileSystem(TajoConf conf) throws IOException { - Path rootPath = TajoConf.getTajoRootDir(conf); - return rootPath.getFileSystem(conf); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 158316e..0d2acf7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -36,7 +36,7 @@ import org.apache.tajo.master.rm.TajoWorkerContainerId; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import java.net.InetSocketAddress; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java index 26bc97b..45219b3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/ha/HAServiceHDFSImpl.java @@ -25,12 +25,14 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.net.NetUtils; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAConstants; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.TUtil; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.List; /** @@ -65,7 +67,10 @@ public class HAServiceHDFSImpl implements HAService { this.context = context; this.conf = context.getConf(); initSystemDirectory(); - this.masterName = conf.get(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS.varname); + + InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress(); + this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort(); + monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); } @@ -133,7 +138,6 @@ public class HAServiceHDFSImpl implements HAService { } private void createMasterFile(boolean isActive) throws IOException { - String hostName = masterName.split(":")[0]; String fileName = masterName.replaceAll(":", "_"); Path path = null; @@ -144,20 +148,27 @@ public class HAServiceHDFSImpl implements HAService { } StringBuilder sb = new StringBuilder(); - sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname, - hostName + ":26002")); - sb.append("_"); - sb.append(context.getConf().get(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS.varname, - hostName + ":26003")); - sb.append("_"); - sb.append(context.getConf().get(TajoConf.ConfVars.CATALOG_ADDRESS.varname, hostName + ":26005")); - sb.append("_"); - sb.append(context.getConf().get(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS.varname, - hostName + ":26080")); + InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.CATALOG_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); FSDataOutputStream out = fs.create(path); - out.writeUTF(sb.toString()); - out.close(); + + try { + out.writeUTF(sb.toString()); + out.hflush(); + out.close(); + } catch (FileAlreadyExistsException e) { + createMasterFile(false); + } if (isActive) { isActiveStatus = true; @@ -169,6 +180,36 @@ public class HAServiceHDFSImpl implements HAService { } + private InetSocketAddress getHostAddress(int type) { + InetSocketAddress address = null; + + switch (type) { + case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .TAJO_MASTER_UMBILICAL_RPC_ADDRESS); + break; + case HAConstants.MASTER_CLIENT_RPC_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .TAJO_MASTER_CLIENT_RPC_ADDRESS); + break; + case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .RESOURCE_TRACKER_RPC_ADDRESS); + break; + case HAConstants.CATALOG_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .CATALOG_ADDRESS); + break; + case HAConstants.MASTER_INFO_ADDRESS: + address = context.getConf().getSocketAddrVar(TajoConf.ConfVars + .TAJO_MASTER_INFO_ADDRESS); + default: + break; + } + + return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort()); + } + @Override public void delete() throws IOException { String fileName = masterName.replaceAll(":", "_"); @@ -196,9 +237,6 @@ public class HAServiceHDFSImpl implements HAService { @Override public List<TajoMasterInfo> getMasters() throws IOException { List<TajoMasterInfo> list = TUtil.newList(); - boolean isAlive = false; - TajoMasterInfo info = null; - String hostAddress = null; Path path = null; FileStatus[] files = fs.listStatus(activePath); @@ -246,20 +284,22 @@ public class HAServiceHDFSImpl implements HAService { while (!stopped && !Thread.currentThread().isInterrupted()) { synchronized (HAServiceHDFSImpl.this) { try { - boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); - if (LOG.isDebugEnabled()) { - LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName + if (!currentActiveMaster.equals(masterName)) { + boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); + if (LOG.isDebugEnabled()) { + LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName + ", isAlive:" + isAlive); - } + } - // If active master is dead, this master should be active master instead of - // previous active master. - if (!isAlive) { - FileStatus[] files = fs.listStatus(activePath); - if (files.length == 0 || (files.length == 1 - && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { - delete(); - register(); + // If active master is dead, this master should be active master instead of + // previous active master. + if (!isAlive) { + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 0 || (files.length == 1 + && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { + delete(); + register(); + } } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index 536f6ac..ddbd3e1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -19,6 +19,7 @@ package org.apache.tajo.master.querymaster; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -52,9 +53,9 @@ public class QueryJobManager extends CompositeService { private SimpleFifoScheduler scheduler; - private final Map<QueryId, QueryInProgress> submittedQueries = new HashMap<QueryId, QueryInProgress>(); + private final Map<QueryId, QueryInProgress> submittedQueries = Maps.newConcurrentMap(); - private final Map<QueryId, QueryInProgress> runningQueries = new HashMap<QueryId, QueryInProgress>(); + private final Map<QueryId, QueryInProgress> runningQueries = Maps.newConcurrentMap(); private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); private AtomicLong maxExecutionTime = new AtomicLong(); http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 42fac3a..7ddd787 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -32,6 +32,7 @@ import org.apache.tajo.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoAsyncDispatcher; @@ -41,7 +42,6 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.worker.TajoWorker; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 1eaef0f..8f63416 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -64,7 +64,7 @@ import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; import org.apache.tajo.worker.AbstractResourceAllocator; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 9345885..f055733 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -47,7 +47,7 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.util.ApplicationIdUtils; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import java.net.InetSocketAddress; import java.util.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 0f0c1f9..4d96529 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -35,6 +35,7 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.ha.TajoMasterInfo; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index f12e83c..c809921 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -35,7 +35,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.DiskDeviceInfo; import org.apache.tajo.storage.DiskMountInfo; import org.apache.tajo.storage.DiskUtil; -import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.ha.HAServiceUtil; import java.io.File; import java.util.List; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 328a31b..6eb710a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; @@ -31,7 +32,6 @@ import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; import org.apache.tajo.rule.SelfDiagnosisRule; -import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 64c27e0..9868297 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -157,7 +157,10 @@ public class TajoTestingCluster { if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); + Logger.getLogger("org.apache.tajo.master.TajoAsyncDispatcher").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), + defaultLevel)); Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); + Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2548768c/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java index 69ed556..e1806e1 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java @@ -26,12 +26,16 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.master.TajoMaster; import org.junit.Test; +import java.util.List; + import static junit.framework.TestCase.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -41,86 +45,90 @@ public class TestHAServiceHDFSImpl { private static Log LOG = LogFactory.getLog(TestHAServiceHDFSImpl.class); private TajoTestingCluster cluster; - private TajoMaster backupMaster1, backupMaster2; + private TajoMaster backupMaster; private TajoConf conf; private TajoClient client; - private Path testDir; private Path haPath, activePath, backupPath; - private static final String LOCAL_HOST = "localhost:"; + private String masterAddress; @Test - public final void testTwoBackupMasters() throws Exception { + public final void testAutoFailOver() throws Exception { cluster = new TajoTestingCluster(true); - cluster.startMiniCluster(1); + cluster.startMiniCluster(1); conf = cluster.getConfiguration(); client = new TajoClientImpl(conf); + try { FileSystem fs = cluster.getDefaultFileSystem(); - startBackupMasters(); - verifyMasterAddress(); - verifySystemDirectories(fs); + masterAddress = HAServiceUtil.getMasterUmbilicalName(conf).split(":")[0]; + + setConfiguration(); - Path backupMasterFile1 = new Path(backupPath, backupMaster1.getMasterName() - .replaceAll(":", "_")); - assertTrue(fs.exists(backupMasterFile1)); + backupMaster = new TajoMaster(); + backupMaster.init(conf); + backupMaster.start(); - Path backupMasterFile2 = new Path(backupPath, backupMaster2.getMasterName() - .replaceAll(":", "_")); - assertTrue(fs.exists(backupMasterFile2)); + assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName()); + + verifySystemDirectories(fs); + + Path backupMasterFile = new Path(backupPath, backupMaster.getMasterName() + .replaceAll(":", "_")); + assertTrue(fs.exists(backupMasterFile)); assertTrue(cluster.getMaster().isActiveMaster()); - assertFalse(backupMaster1.isActiveMaster()); - assertFalse(backupMaster2.isActiveMaster()); + assertFalse(backupMaster.isActiveMaster()); + + createDatabaseAndTable(); + verifyDataBaseAndTable(); + client.close(); + + cluster.getMaster().stop(); + + Thread.sleep(7000); + + assertFalse(cluster.getMaster().isActiveMaster()); + assertTrue(backupMaster.isActiveMaster()); + + client = new TajoClientImpl(conf); + verifyDataBaseAndTable(); } finally { - IOUtils.cleanup(LOG, client, backupMaster1, backupMaster2); + client.close(); + backupMaster.stop(); cluster.shutdownMiniCluster(); } } - private void startBackupMasters() throws Exception { - + private void setConfiguration() { conf = cluster.getConfiguration(); - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); - conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); - conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); - - backupMaster1 = new TajoMaster(); - backupMaster1.init(conf); - backupMaster1.start(); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); + masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); + masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); + masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, - LOCAL_HOST + NetUtils.getFreeSocketPort()); + masterAddress + ":" + NetUtils.getFreeSocketPort()); + conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS, + masterAddress + ":" + NetUtils.getFreeSocketPort()); conf.setBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE, true); - backupMaster2 = new TajoMaster(); - backupMaster2.init(conf); - backupMaster2.start(); - } + //Client API service RPC Server + conf.setIntVar(TajoConf.ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM, 2); - private void verifyMasterAddress() { - assertNotEquals(cluster.getMaster().getMasterName(), - backupMaster1.getMasterName()); - assertNotEquals(cluster.getMaster().getMasterName(), - backupMaster2.getMasterName()); - assertNotEquals(backupMaster1.getMasterName(), - backupMaster2.getMasterName()); + // Internal RPC Server + conf.setIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2); } private void verifySystemDirectories(FileSystem fs) throws Exception { @@ -134,6 +142,17 @@ public class TestHAServiceHDFSImpl { assertTrue(fs.exists(backupPath)); assertEquals(1, fs.listStatus(activePath).length); - assertEquals(2, fs.listStatus(backupPath).length); + assertEquals(1, fs.listStatus(backupPath).length); + } + + private void createDatabaseAndTable() throws Exception { + client.executeQuery("CREATE TABLE default.table1 (age int);"); + client.executeQuery("CREATE TABLE default.table2 (age int);"); + } + + private void verifyDataBaseAndTable() throws Exception { + client.existDatabase("default"); + client.existTable("default.table1"); + client.existTable("default.table2"); } }
