TAJO-1306: HAServiceUtil should not directly use HDFS. Closes #358
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4595375f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4595375f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4595375f Branch: refs/heads/index_support Commit: 4595375f7e6b62436e0d4bf88a8aef1ca680c726 Parents: 015913b Author: Hyunsik Choi <[email protected]> Authored: Wed Jan 28 09:23:20 2015 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Wed Jan 28 09:23:20 2015 -0800 ---------------------------------------------------------------------- .../tajo/catalog/AbstractCatalogClient.java | 12 +- .../apache/tajo/client/DummyServiceTracker.java | 84 +++ .../apache/tajo/client/SessionConnection.java | 44 +- .../org/apache/tajo/client/TajoClientImpl.java | 38 +- .../apache/tajo/client/TajoHAClientUtil.java | 14 +- .../java/org/apache/tajo/conf/TajoConf.java | 7 +- .../java/org/apache/tajo/ha/HAServiceUtil.java | 39 -- .../apache/tajo/service/BaseServiceTracker.java | 97 ++++ .../apache/tajo/service/HAServiceTracker.java | 48 ++ .../org/apache/tajo/service/ServiceTracker.java | 63 ++ .../tajo/service/ServiceTrackerException.java | 30 + .../tajo/service/ServiceTrackerFactory.java | 41 ++ .../org/apache/tajo/service/TajoMasterInfo.java | 89 +++ .../org/apache/tajo/benchmark/BenchmarkSet.java | 15 +- .../main/java/org/apache/tajo/ha/HAService.java | 56 -- .../org/apache/tajo/ha/HAServiceHDFSImpl.java | 316 ---------- .../org/apache/tajo/ha/HdfsServiceTracker.java | 576 +++++++++++++++++++ .../java/org/apache/tajo/ha/TajoMasterInfo.java | 89 --- .../apache/tajo/master/TajoContainerProxy.java | 27 +- .../java/org/apache/tajo/master/TajoMaster.java | 30 +- .../apache/tajo/querymaster/QueryMaster.java | 66 +-- .../main/java/org/apache/tajo/util/JSPUtil.java | 12 +- .../tajo/worker/TajoResourceAllocator.java | 28 +- .../java/org/apache/tajo/worker/TajoWorker.java | 46 +- .../tajo/worker/TajoWorkerClientService.java | 2 +- .../tajo/worker/WorkerHeartbeatService.java | 20 +- .../ConnectivityCheckerRuleForTajoWorker.java | 11 +- .../resources/webapps/admin/catalogview.jsp | 5 +- .../main/resources/webapps/admin/cluster.jsp | 7 +- .../src/main/resources/webapps/admin/index.jsp | 7 +- .../resources/webapps/admin/query_executor.jsp | 5 +- .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 10 +- .../org/apache/tajo/jdbc/JdbcConnection.java | 10 +- .../org/apache/tajo/jdbc/TajoStatement.java | 4 +- .../org/apache/tajo/storage/StorageUtil.java | 16 - 35 files changed, 1156 insertions(+), 808 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index 1a2fd44..718f7d6 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -29,12 +29,13 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.ProtoUtil; import java.net.InetSocketAddress; @@ -48,6 +49,7 @@ import java.util.List; public abstract class AbstractCatalogClient implements CatalogService { private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class); + protected ServiceTracker serviceTracker; protected RpcConnectionPool pool; protected InetSocketAddress catalogServerAddr; protected TajoConf conf; @@ -57,6 +59,7 @@ public abstract class AbstractCatalogClient implements CatalogService { public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) { this.pool = RpcConnectionPool.getPool(); this.catalogServerAddr = catalogServerAddr; + this.serviceTracker = ServiceTrackerFactory.get(conf); this.conf = conf; } @@ -64,14 +67,11 @@ public abstract class AbstractCatalogClient implements CatalogService { if (catalogServerAddr == null) { return null; } else { + if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { return catalogServerAddr; } else { - if (!HAServiceUtil.isMasterAlive(catalogServerAddr, conf)) { - return HAServiceUtil.getCatalogAddress(conf); - } else { - return catalogServerAddr; - } + return serviceTracker.getCatalogAddress(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java new file mode 100644 index 0000000..762c2e7 --- /dev/null +++ b/tajo-client/src/main/java/org/apache/tajo/client/DummyServiceTracker.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.client; + +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerException; +import org.apache.tajo.service.TajoMasterInfo; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +public class DummyServiceTracker implements ServiceTracker { + private InetSocketAddress address; + + public DummyServiceTracker(InetSocketAddress address) { + this.address = address; + } + + @Override + public boolean isHighAvailable() { + return false; + } + + @Override + public InetSocketAddress getUmbilicalAddress() { + throw new UnsupportedException(); + } + + @Override + public InetSocketAddress getClientServiceAddress() { + return address; + } + + @Override + public InetSocketAddress getResourceTrackerAddress() { + throw new UnsupportedException(); + } + + @Override + public InetSocketAddress getCatalogAddress() { + throw new UnsupportedException(); + } + + @Override + public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException { + throw new UnsupportedException(); + } + + @Override + public void register() throws IOException { + } + + @Override + public void delete() throws IOException { + } + + @Override + public boolean isActiveStatus() { + return true; + } + + @Override + public List<TajoMasterInfo> getMasters() throws IOException { + throw new UnsupportedException(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index 5490be4..3e2b9cc 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -21,12 +21,10 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.auth.UserRoleInfo; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; @@ -34,6 +32,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; @@ -58,8 +57,6 @@ public class SessionConnection implements Closeable { private final TajoConf conf; - final InetSocketAddress tajoMasterAddr; - final RpcConnectionPool connPool; private final String baseDatabase; @@ -73,41 +70,29 @@ public class SessionConnection implements Closeable { /** session variable cache */ private final Map<String, String> sessionVarsCache = new HashMap<String, String>(); - - public SessionConnection(TajoConf conf) throws IOException { - this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null); - } - - public SessionConnection(TajoConf conf, @Nullable String baseDatabase) throws IOException { - this(conf, NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase); - } - - public SessionConnection(InetSocketAddress addr) throws IOException { - this(new TajoConf(), addr, null); - } - - public SessionConnection(String hostname, int port, String baseDatabase) throws IOException { - this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase); - } + private ServiceTracker serviceTracker; /** * Connect to TajoMaster * * @param conf TajoConf - * @param addr TajoMaster address + * @param tracker TajoMaster address * @param baseDatabase The base database name. It is case sensitive. If it is null, * the 'default' database will be used. * @throws java.io.IOException */ - public SessionConnection(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException { + public SessionConnection(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) + throws IOException { + this.conf = conf; this.conf.set("tajo.disk.scheduler.report.interval", "0"); - this.tajoMasterAddr = addr; int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM); // Don't share connection pool per client connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum); userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; + + this.serviceTracker = tracker; } public Map<String, String> getClientSideSessionVars() { @@ -140,7 +125,8 @@ public class SessionConnection implements Closeable { public boolean isConnected() { if(!closed.get()){ try { - return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected(); + return connPool.getConnection(serviceTracker.getClientServiceAddress(), + TajoMasterClientProtocol.class, false).isConnected(); } catch (Throwable e) { return false; } @@ -309,15 +295,7 @@ public class SessionConnection implements Closeable { } protected InetSocketAddress getTajoMasterAddr() { - if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - return tajoMasterAddr; - } else { - if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) { - return HAServiceUtil.getMasterClientAddress(conf); - } else { - return tajoMasterAddr; - } - } + return serviceTracker.getClientServiceAddress(); } protected void checkSessionAndGet(NettyClientBase client) throws ServiceException { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java index 8eafc91..f8eef28 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java @@ -32,8 +32,6 @@ import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.jdbc.TajoResultSet; @@ -41,7 +39,8 @@ import org.apache.tajo.rule.EvaluationContext; import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; -import org.apache.tajo.util.NetUtils; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import java.io.IOException; import java.net.InetSocketAddress; @@ -56,41 +55,30 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que QueryClient queryClient; CatalogAdminClient catalogClient; - public TajoClientImpl(TajoConf conf) throws IOException { - this(conf, TajoHAClientUtil.getRpcClientAddress(conf), null); - } - - public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException { - this(conf, TajoHAClientUtil.getRpcClientAddress(conf), baseDatabase); - } - - public TajoClientImpl(InetSocketAddress addr) throws IOException { - this(new TajoConf(), addr, null); - } - /** * Connect to TajoMaster * * @param conf TajoConf - * @param addr TajoMaster address + * @param tracker ServiceTracker to discovery Tajo Client RPC * @param baseDatabase The base database name. It is case sensitive. If it is null, * the 'default' database will be used. * @throws java.io.IOException */ - public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException { - super(conf, addr, baseDatabase); + public TajoClientImpl(TajoConf conf, ServiceTracker tracker, @Nullable String baseDatabase) throws IOException { + super(conf, tracker, baseDatabase); + this.queryClient = new QueryClientImpl(this); this.catalogClient = new CatalogAdminClientImpl(this); - + diagnoseTajoClient(); } - public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException { - super(hostName, port, baseDatabase); - this.queryClient = new QueryClientImpl(this); - this.catalogClient = new CatalogAdminClientImpl(this); - - diagnoseTajoClient(); + public TajoClientImpl(TajoConf conf) throws IOException { + this(conf, ServiceTrackerFactory.get(conf), null); + } + + public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException { + this(conf, ServiceTrackerFactory.get(conf), baseDatabase); } private void diagnoseTajoClient() throws EvaluationFailedException { http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java index 12a9ec8..7267b10 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java @@ -40,10 +40,8 @@ import com.google.protobuf.ServiceException; import org.apache.tajo.cli.tsql.TajoCli.TajoCliContext; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.util.NetUtils; import java.io.IOException; -import java.net.InetSocketAddress; public class TajoHAClientUtil { /** @@ -65,6 +63,7 @@ public class TajoHAClientUtil { TajoCliContext context) throws IOException, ServiceException { if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + if (!HAServiceUtil.isMasterAlive(conf.getVar( TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS), conf)) { TajoClient tajoClient = null; @@ -85,15 +84,4 @@ public class TajoHAClientUtil { return client; } } - - - public static InetSocketAddress getRpcClientAddress(TajoConf conf) { - if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - return NetUtils.createSocketAddr(HAServiceUtil.getMasterClientName(conf)); - } else { - return NetUtils.createSocketAddr(conf.getVar(TajoConf.ConfVars - .TAJO_MASTER_CLIENT_RPC_ADDRESS)); - } - } - } http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 1bb96bc..fe5ff54 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.ConfigKey; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; +import org.apache.tajo.service.BaseServiceTracker; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.NumberUtil; import org.apache.tajo.util.TUtil; @@ -134,10 +135,14 @@ public class TajoConf extends Configuration { Validators.networkAddr()), TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()), - // Tajo Master HA Configurations + // High availability configurations TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()), TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec + // Service discovery + DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()), + HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"), + // Resource tracker service RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003", Validators.networkAddr()), http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java index 52c2ade..7001228 100644 --- a/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/ha/HAServiceUtil.java @@ -18,8 +18,6 @@ package org.apache.tajo.ha; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; @@ -34,15 +32,6 @@ import java.util.ArrayList; import java.util.List; public class HAServiceUtil { - private static Log LOG = LogFactory.getLog(HAServiceUtil.class); - - public static InetSocketAddress getMasterUmbilicalAddress(TajoConf conf) { - return getMasterAddress(conf, HAConstants.MASTER_UMBILICAL_RPC_ADDRESS); - } - - public static String getMasterUmbilicalName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getMasterUmbilicalAddress(conf)); - } public static InetSocketAddress getMasterClientAddress(TajoConf conf) { return getMasterAddress(conf, HAConstants.MASTER_CLIENT_RPC_ADDRESS); @@ -52,30 +41,6 @@ public class HAServiceUtil { return NetUtils.normalizeInetSocketAddress(getMasterClientAddress(conf)); } - public static InetSocketAddress getResourceTrackerAddress(TajoConf conf) { - return getMasterAddress(conf, HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); - } - - public static String getResourceTrackerName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getResourceTrackerAddress(conf)); - } - - public static InetSocketAddress getCatalogAddress(TajoConf conf) { - return getMasterAddress(conf, HAConstants.CATALOG_ADDRESS); - } - - public static String getCatalogName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getCatalogAddress(conf)); - } - - public static InetSocketAddress getMasterInfoAddress(TajoConf conf) { - return getMasterAddress(conf, HAConstants.MASTER_INFO_ADDRESS); - } - - public static String getMasterInfoName(TajoConf conf) { - return NetUtils.normalizeInetSocketAddress(getMasterInfoAddress(conf)); - } - public static InetSocketAddress getMasterAddress(TajoConf conf, int type) { InetSocketAddress masterAddress = null; @@ -153,10 +118,6 @@ public class HAServiceUtil { return masterAddress; } - public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) { - return isMasterAlive(NetUtils.normalizeInetSocketAddress(masterAddress), conf); - } - public static boolean isMasterAlive(String masterName, TajoConf conf) { boolean isAlive = true; http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java new file mode 100644 index 0000000..bf7fd2c --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/service/BaseServiceTracker.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.service; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +public class BaseServiceTracker implements ServiceTracker { + private final TajoConf conf; + private TajoMasterInfo tajoMasterInfo; + private List<TajoMasterInfo> tajoMasterInfos; + + @SuppressWarnings("unused") + public BaseServiceTracker(TajoConf conf) { + this.conf = conf; + + tajoMasterInfo = new TajoMasterInfo(); + tajoMasterInfo.setActive(true); + tajoMasterInfo.setAvailable(true); + tajoMasterInfo.setTajoMasterAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); + tajoMasterInfo.setTajoClientAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)); + tajoMasterInfo.setWorkerResourceTrackerAddr(conf.getSocketAddrVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS)); + tajoMasterInfo.setCatalogAddress(conf.getSocketAddrVar(TajoConf.ConfVars.CATALOG_ADDRESS)); + tajoMasterInfo.setWebServerAddress(conf.getSocketAddrVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS)); + + tajoMasterInfos = TUtil.newList(tajoMasterInfo); + } + + @Override + public boolean isHighAvailable() { + return false; + } + + @Override + public InetSocketAddress getUmbilicalAddress() { + return tajoMasterInfo.getTajoMasterAddress(); + } + + @Override + public InetSocketAddress getClientServiceAddress() { + return tajoMasterInfo.getTajoClientAddress(); + } + + @Override + public InetSocketAddress getResourceTrackerAddress() { + return tajoMasterInfo.getWorkerResourceTrackerAddr(); + } + + @Override + public InetSocketAddress getCatalogAddress() { + return tajoMasterInfo.getCatalogAddress(); + } + + @Override + public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException { + return tajoMasterInfo.getWebServerAddress(); + } + + @Override + public void register() throws IOException { + } + + @Override + public void delete() throws IOException { + } + + @Override + public boolean isActiveStatus() { + return true; + } + + @Override + public List<TajoMasterInfo> getMasters() throws IOException { + return tajoMasterInfos; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java new file mode 100644 index 0000000..c808537 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/service/HAServiceTracker.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.service; + +import org.apache.hadoop.net.NetUtils; + +import javax.net.SocketFactory; +import java.net.InetSocketAddress; +import java.net.Socket; + +public abstract class HAServiceTracker implements ServiceTracker { + + static SocketFactory socketFactory = SocketFactory.getDefault(); + + public boolean isHighAvailable() { + return true; + } + + public static boolean checkConnection(InetSocketAddress address) { + boolean isAlive = true; + + try { + int connectionTimeout = 10; + + Socket socket = socketFactory.createSocket(); + NetUtils.connect(socket, address, connectionTimeout); + } catch (Exception e) { + isAlive = false; + } + return isAlive; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java new file mode 100644 index 0000000..73ff112 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTracker.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.service; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +public interface ServiceTracker { + + public abstract boolean isHighAvailable(); + + public abstract InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException; + + public abstract InetSocketAddress getClientServiceAddress() throws ServiceTrackerException; + + public abstract InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException; + + public abstract InetSocketAddress getCatalogAddress() throws ServiceTrackerException; + + public abstract InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException; + + /** + * Add master name to shared storage. + */ + public void register() throws IOException; + + + /** + * Delete master name to shared storage. + * + */ + public void delete() throws IOException; + + /** + * + * @return True if current master is an active master. + */ + public boolean isActiveStatus(); + + /** + * + * @return return all master list + * @throws IOException + */ + public List<TajoMasterInfo> getMasters() throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java new file mode 100644 index 0000000..3407c51 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerException.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.service; + +public class ServiceTrackerException extends RuntimeException { + + public ServiceTrackerException(Throwable t) { + super(t); + } + + public ServiceTrackerException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java new file mode 100644 index 0000000..5828055 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/service/ServiceTrackerFactory.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.service; + +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.util.ReflectionUtil; + +public class ServiceTrackerFactory { + + public static ServiceTracker get(TajoConf conf) { + Class<ServiceTracker> trackerClass; + + try { + if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.HA_SERVICE_TRACKER_CLASS); + } else { + trackerClass = (Class<ServiceTracker>) conf.getClassVar(TajoConf.ConfVars.DEFAULT_SERVICE_TRACKER_CLASS); + } + return ReflectionUtil.newInstance(trackerClass, conf); + + } catch (Throwable t) { + throw new RuntimeException(t); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java new file mode 100644 index 0000000..481b528 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/service/TajoMasterInfo.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.service; + +import java.net.InetSocketAddress; + +public class TajoMasterInfo { + + private boolean available; + private boolean isActive; + + private InetSocketAddress tajoMasterAddress; + private InetSocketAddress tajoClientAddress; + private InetSocketAddress workerResourceTrackerAddr; + private InetSocketAddress catalogAddress; + private InetSocketAddress webServerAddress; + + public InetSocketAddress getTajoMasterAddress() { + return tajoMasterAddress; + } + + public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { + this.tajoMasterAddress = tajoMasterAddress; + } + + public InetSocketAddress getTajoClientAddress() { + return tajoClientAddress; + } + + public void setTajoClientAddress(InetSocketAddress tajoClientAddress) { + this.tajoClientAddress = tajoClientAddress; + } + + public InetSocketAddress getWorkerResourceTrackerAddr() { + return workerResourceTrackerAddr; + } + + public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { + this.workerResourceTrackerAddr = workerResourceTrackerAddr; + } + + public InetSocketAddress getCatalogAddress() { + return catalogAddress; + } + + public void setCatalogAddress(InetSocketAddress catalogAddress) { + this.catalogAddress = catalogAddress; + } + + public InetSocketAddress getWebServerAddress() { + return webServerAddress; + } + + public void setWebServerAddress(InetSocketAddress webServerAddress) { + this.webServerAddress = webServerAddress; + } + + public boolean isAvailable() { + return available; + } + + public void setAvailable(boolean available) { + this.available = available; + } + + public boolean isActive() { + return isActive; + } + + public void setActive(boolean active) { + isActive = active; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java index b1b6450..0304e92 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java @@ -18,19 +18,23 @@ package org.apache.tajo.benchmark; +import com.google.common.base.Preconditions; import com.google.protobuf.ServiceException; import org.apache.hadoop.net.NetUtils; import org.apache.tajo.catalog.CatalogConstants; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.store.MemStore; +import org.apache.tajo.client.DummyServiceTracker; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.FileUtil; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -43,9 +47,14 @@ public abstract class BenchmarkSet { public void init(TajoConf conf, String dataDir) throws IOException { this.dataDir = dataDir; - if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) { - tajo = new TajoClientImpl(NetUtils.createSocketAddr( - System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname))); + + if (System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname) != null) { + + String addressStr = System.getProperty(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS.varname); + InetSocketAddress addr = NetUtils.createSocketAddr(addressStr); + ServiceTracker serviceTracker = new DummyServiceTracker(addr); + tajo = new TajoClientImpl(conf, serviceTracker, null); + } else { conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName()); tajo = new TajoClientImpl(conf); http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java deleted file mode 100644 index 1329223..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/ha/HAService.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.ha; - -import java.io.IOException; -import java.util.List; - -/** - * The HAService is responsible for setting active TajoMaster on startup or when the - * current active is changing (eg due to failure), monitoring the health of TajoMaster. - * - */ -public interface HAService { - - /** - * Add master name to shared storage. - */ - public void register() throws IOException; - - - /** - * Delete master name to shared storage. - * - */ - public void delete() throws IOException; - - /** - * - * @return True if current master is an active master. - */ - public boolean isActiveStatus(); - - /** - * - * @return return all master list - * @throws IOException - */ - public List<TajoMasterInfo> getMasters() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java b/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java deleted file mode 100644 index e18a9b2..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/ha/HAServiceHDFSImpl.java +++ /dev/null @@ -1,316 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.ha; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.net.NetUtils; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.util.TUtil; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; - -/** - * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster. - * - */ -public class HAServiceHDFSImpl implements HAService { - private static Log LOG = LogFactory.getLog(HAServiceHDFSImpl.class); - - private MasterContext context; - private TajoConf conf; - - private FileSystem fs; - - private String masterName; - private Path rootPath; - private Path haPath; - private Path activePath; - private Path backupPath; - - private boolean isActiveStatus = false; - - //thread which runs periodically to see the last time since a heartbeat is received. - private Thread checkerThread; - private volatile boolean stopped = false; - - private int monitorInterval; - - private String currentActiveMaster; - - public HAServiceHDFSImpl(MasterContext context) throws IOException { - this.context = context; - this.conf = context.getConf(); - initSystemDirectory(); - - InetSocketAddress socketAddress = context.getTajoMasterService().getBindAddress(); - this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort(); - - monitorInterval = conf.getIntVar(TajoConf.ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); - } - - private void initSystemDirectory() throws IOException { - // Get Tajo root dir - this.rootPath = TajoConf.getTajoRootDir(conf); - - // Check Tajo root dir - this.fs = rootPath.getFileSystem(conf); - - // Check and create Tajo system HA dir - haPath = TajoConf.getSystemHADir(conf); - if (!fs.exists(haPath)) { - fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); - LOG.info("System HA dir '" + haPath + "' is created"); - } - - activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); - if (!fs.exists(activePath)) { - fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); - LOG.info("System HA Active dir '" + activePath + "' is created"); - } - - backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); - if (!fs.exists(backupPath)) { - fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); - LOG.info("System HA Backup dir '" + backupPath + "' is created"); - } - } - - private void startPingChecker() { - if (checkerThread == null) { - checkerThread = new Thread(new PingChecker()); - checkerThread.setName("Ping Checker"); - checkerThread.start(); - } - } - - @Override - public void register() throws IOException { - FileStatus[] files = fs.listStatus(activePath); - - // Phase 1: If there is not another active master, this try to become active master. - if (files.length == 0) { - createMasterFile(true); - currentActiveMaster = masterName; - LOG.info(String.format("This is added to active master (%s)", masterName)); - } else { - // Phase 2: If there is active master information, we need to check its status. - Path activePath = files[0].getPath(); - currentActiveMaster = activePath.getName().replaceAll("_", ":"); - - // Phase 3: If current active master is dead, this master should be active master. - if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) { - fs.delete(activePath, true); - createMasterFile(true); - currentActiveMaster = masterName; - LOG.info(String.format("This is added to active master (%s)", masterName)); - } else { - // Phase 4: If current active master is alive, this master need to be backup master. - createMasterFile(false); - LOG.info(String.format("This is added to backup masters (%s)", masterName)); - } - } - } - - private void createMasterFile(boolean isActive) throws IOException { - String fileName = masterName.replaceAll(":", "_"); - Path path = null; - - if (isActive) { - path = new Path(activePath, fileName); - } else { - path = new Path(backupPath, fileName); - } - - StringBuilder sb = new StringBuilder(); - InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.CATALOG_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); - - address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); - sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); - - FSDataOutputStream out = fs.create(path); - - try { - out.writeUTF(sb.toString()); - out.hflush(); - out.close(); - } catch (FileAlreadyExistsException e) { - createMasterFile(false); - } - - if (isActive) { - isActiveStatus = true; - } else { - isActiveStatus = false; - } - - startPingChecker(); - } - - - private InetSocketAddress getHostAddress(int type) { - InetSocketAddress address = null; - - switch (type) { - case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .TAJO_MASTER_UMBILICAL_RPC_ADDRESS); - break; - case HAConstants.MASTER_CLIENT_RPC_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .TAJO_MASTER_CLIENT_RPC_ADDRESS); - break; - case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .RESOURCE_TRACKER_RPC_ADDRESS); - break; - case HAConstants.CATALOG_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .CATALOG_ADDRESS); - break; - case HAConstants.MASTER_INFO_ADDRESS: - address = context.getConf().getSocketAddrVar(TajoConf.ConfVars - .TAJO_MASTER_INFO_ADDRESS); - default: - break; - } - - return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort()); - } - - @Override - public void delete() throws IOException { - String fileName = masterName.replaceAll(":", "_"); - - Path activeFile = new Path(activePath, fileName); - if (fs.exists(activeFile)) { - fs.delete(activeFile, true); - } - - Path backupFile = new Path(backupPath, fileName); - if (fs.exists(backupFile)) { - fs.delete(backupFile, true); - } - if (isActiveStatus) { - isActiveStatus = false; - } - stopped = true; - } - - @Override - public boolean isActiveStatus() { - return isActiveStatus; - } - - @Override - public List<TajoMasterInfo> getMasters() throws IOException { - List<TajoMasterInfo> list = TUtil.newList(); - Path path = null; - - FileStatus[] files = fs.listStatus(activePath); - if (files.length == 1) { - path = files[0].getPath(); - list.add(createTajoMasterInfo(path, true)); - } - - files = fs.listStatus(backupPath); - for (FileStatus status : files) { - path = status.getPath(); - list.add(createTajoMasterInfo(path, false)); - } - - return list; - } - - private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException { - String masterAddress = path.getName().replaceAll("_", ":"); - boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf); - - FSDataInputStream stream = fs.open(path); - String data = stream.readUTF(); - - stream.close(); - - String[] addresses = data.split("_"); - TajoMasterInfo info = new TajoMasterInfo(); - - info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress)); - info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0])); - info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1])); - info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2])); - info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3])); - - info.setAvailable(isAlive); - info.setActive(isActive); - - return info; - } - - private class PingChecker implements Runnable { - @Override - public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - synchronized (HAServiceHDFSImpl.this) { - try { - if (!currentActiveMaster.equals(masterName)) { - boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); - if (LOG.isDebugEnabled()) { - LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName - + ", isAlive:" + isAlive); - } - - // If active master is dead, this master should be active master instead of - // previous active master. - if (!isAlive) { - FileStatus[] files = fs.listStatus(activePath); - if (files.length == 0 || (files.length == 1 - && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { - delete(); - register(); - } - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - try { - Thread.sleep(monitorInterval); - } catch (InterruptedException e) { - LOG.info("PingChecker interrupted. - masterName:" + masterName); - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java new file mode 100644 index 0000000..1475a5d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java @@ -0,0 +1,576 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.ha; + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.net.NetUtils; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.service.HAServiceTracker; +import org.apache.tajo.service.ServiceTrackerException; +import org.apache.tajo.service.TajoMasterInfo; +import org.apache.tajo.util.TUtil; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +/** + * This implements HAService utilizing HDFS cluster. This saves master status to HDFS cluster. + * + */ +@SuppressWarnings("unused") +public class HdfsServiceTracker extends HAServiceTracker { + private static Log LOG = LogFactory.getLog(HdfsServiceTracker.class); + + private TajoConf conf; + + private FileSystem fs; + + private String masterName; + private Path rootPath; + private Path haPath; + private Path activePath; + private Path backupPath; + + private boolean isActiveStatus = false; + + //thread which runs periodically to see the last time since a heartbeat is received. + private Thread checkerThread; + private volatile boolean stopped = false; + + private int monitorInterval; + + private String currentActiveMaster; + + public HdfsServiceTracker(TajoConf conf) throws IOException { + this.conf = conf; + initSystemDirectory(); + + InetSocketAddress socketAddress = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); + this.masterName = socketAddress.getAddress().getHostAddress() + ":" + socketAddress.getPort(); + + monitorInterval = conf.getIntVar(ConfVars.TAJO_MASTER_HA_MONITOR_INTERVAL); + } + + private void initSystemDirectory() throws IOException { + // Get Tajo root dir + this.rootPath = TajoConf.getTajoRootDir(conf); + + // Check Tajo root dir + this.fs = rootPath.getFileSystem(conf); + + // Check and create Tajo system HA dir + haPath = TajoConf.getSystemHADir(conf); + if (!fs.exists(haPath)) { + fs.mkdirs(haPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA dir '" + haPath + "' is created"); + } + + activePath = new Path(haPath, TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + if (!fs.exists(activePath)) { + fs.mkdirs(activePath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA Active dir '" + activePath + "' is created"); + } + + backupPath = new Path(haPath, TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + if (!fs.exists(backupPath)) { + fs.mkdirs(backupPath, new FsPermission(TajoMaster.SYSTEM_RESOURCE_DIR_PERMISSION)); + LOG.info("System HA Backup dir '" + backupPath + "' is created"); + } + } + + private void startPingChecker() { + if (checkerThread == null) { + checkerThread = new Thread(new PingChecker()); + checkerThread.setName("Ping Checker"); + checkerThread.start(); + } + } + + @Override + public void register() throws IOException { + FileStatus[] files = fs.listStatus(activePath); + + // Phase 1: If there is not another active master, this try to become active master. + if (files.length == 0) { + createMasterFile(true); + currentActiveMaster = masterName; + LOG.info(String.format("This is added to active master (%s)", masterName)); + } else { + // Phase 2: If there is active master information, we need to check its status. + Path activePath = files[0].getPath(); + currentActiveMaster = activePath.getName().replaceAll("_", ":"); + + // Phase 3: If current active master is dead, this master should be active master. + if (!HAServiceUtil.isMasterAlive(currentActiveMaster, conf)) { + fs.delete(activePath, true); + createMasterFile(true); + currentActiveMaster = masterName; + LOG.info(String.format("This is added to active master (%s)", masterName)); + } else { + // Phase 4: If current active master is alive, this master need to be backup master. + createMasterFile(false); + LOG.info(String.format("This is added to backup masters (%s)", masterName)); + } + } + } + + /** + * It will creates the following form string. It includes + * + * <pre> + * {CLIENT_RPC_HOST:PORT}_{RESOURCE_TRACKER_HOST:PORT}_{CATALOG_HOST:PORT}_{MASTER_WEB_HOST:PORT} + * </pre> + * + * @param isActive A boolean flag to indicate if it is for master or not. + * @throws IOException + */ + private void createMasterFile(boolean isActive) throws IOException { + String fileName = masterName.replaceAll(":", "_"); + Path path = null; + + if (isActive) { + path = new Path(activePath, fileName); + } else { + path = new Path(backupPath, fileName); + } + + StringBuilder sb = new StringBuilder(); + InetSocketAddress address = getHostAddress(HAConstants.MASTER_CLIENT_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.RESOURCE_TRACKER_RPC_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.CATALOG_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()).append("_"); + + address = getHostAddress(HAConstants.MASTER_INFO_ADDRESS); + sb.append(address.getAddress().getHostAddress()).append(":").append(address.getPort()); + + FSDataOutputStream out = fs.create(path); + + try { + out.writeUTF(sb.toString()); + out.hsync(); + out.close(); + } catch (FileAlreadyExistsException e) { + createMasterFile(false); + } + + if (isActive) { + isActiveStatus = true; + } else { + isActiveStatus = false; + } + + startPingChecker(); + } + + + private InetSocketAddress getHostAddress(int type) { + InetSocketAddress address = null; + + switch (type) { + case HAConstants.MASTER_UMBILICAL_RPC_ADDRESS: + address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); + break; + case HAConstants.MASTER_CLIENT_RPC_ADDRESS: + address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS); + break; + case HAConstants.RESOURCE_TRACKER_RPC_ADDRESS: + address = conf.getSocketAddrVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS); + break; + case HAConstants.CATALOG_ADDRESS: + address = conf.getSocketAddrVar(ConfVars.CATALOG_ADDRESS); + break; + case HAConstants.MASTER_INFO_ADDRESS: + address = conf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS); + default: + break; + } + + return NetUtils.createSocketAddr(masterName.split(":")[0] + ":" + address.getPort()); + } + + @Override + public void delete() throws IOException { + String fileName = masterName.replaceAll(":", "_"); + + Path activeFile = new Path(activePath, fileName); + if (fs.exists(activeFile)) { + fs.delete(activeFile, true); + } + + Path backupFile = new Path(backupPath, fileName); + if (fs.exists(backupFile)) { + fs.delete(backupFile, true); + } + if (isActiveStatus) { + isActiveStatus = false; + } + stopped = true; + } + + @Override + public boolean isActiveStatus() { + return isActiveStatus; + } + + @Override + public List<TajoMasterInfo> getMasters() throws IOException { + List<TajoMasterInfo> list = TUtil.newList(); + Path path = null; + + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 1) { + path = files[0].getPath(); + list.add(createTajoMasterInfo(path, true)); + } + + files = fs.listStatus(backupPath); + for (FileStatus status : files) { + path = status.getPath(); + list.add(createTajoMasterInfo(path, false)); + } + + return list; + } + + private TajoMasterInfo createTajoMasterInfo(Path path, boolean isActive) throws IOException { + String masterAddress = path.getName().replaceAll("_", ":"); + boolean isAlive = HAServiceUtil.isMasterAlive(masterAddress, conf); + + FSDataInputStream stream = fs.open(path); + String data = stream.readUTF(); + + stream.close(); + + String[] addresses = data.split("_"); + TajoMasterInfo info = new TajoMasterInfo(); + + info.setTajoMasterAddress(NetUtils.createSocketAddr(masterAddress)); + info.setTajoClientAddress(NetUtils.createSocketAddr(addresses[0])); + info.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(addresses[1])); + info.setCatalogAddress(NetUtils.createSocketAddr(addresses[2])); + info.setWebServerAddress(NetUtils.createSocketAddr(addresses[3])); + + info.setAvailable(isAlive); + info.setActive(isActive); + + return info; + } + + private class PingChecker implements Runnable { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (HdfsServiceTracker.this) { + try { + if (!currentActiveMaster.equals(masterName)) { + boolean isAlive = HAServiceUtil.isMasterAlive(currentActiveMaster, conf); + if (LOG.isDebugEnabled()) { + LOG.debug("currentActiveMaster:" + currentActiveMaster + ", thisMasterName:" + masterName + + ", isAlive:" + isAlive); + } + + // If active master is dead, this master should be active master instead of + // previous active master. + if (!isAlive) { + FileStatus[] files = fs.listStatus(activePath); + if (files.length == 0 || (files.length == 1 + && currentActiveMaster.equals(files[0].getPath().getName().replaceAll("_", ":")))) { + delete(); + register(); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + try { + Thread.sleep(monitorInterval); + } catch (InterruptedException e) { + LOG.info("PingChecker interrupted. - masterName:" + masterName); + break; + } + } + } + } + + private final static int MASTER_UMBILICAL_RPC_ADDRESS = 0; + private final static int MASTER_CLIENT_RPC_ADDRESS = 1; + private final static int RESOURCE_TRACKER_RPC_ADDRESS = 2; + private final static int CATALOG_ADDRESS = 3; + private final static int MASTER_HTTP_INFO = 4; + + private volatile InetSocketAddress umbilicalRpcAddr; + private volatile InetSocketAddress clientRpcAddr; + private volatile InetSocketAddress resourceTrackerRpcAddr; + private volatile InetSocketAddress catalogAddr; + private volatile InetSocketAddress masterHttpInfoAddr; + + @Override + public InetSocketAddress getUmbilicalAddress() { + if (!checkConnection(umbilicalRpcAddr)) { + umbilicalRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_UMBILICAL_RPC_ADDRESS)); + } + + return umbilicalRpcAddr; + } + + @Override + public InetSocketAddress getClientServiceAddress() { + if (!checkConnection(clientRpcAddr)) { + clientRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_CLIENT_RPC_ADDRESS)); + } + + return clientRpcAddr; + } + + @Override + public InetSocketAddress getResourceTrackerAddress() { + if (!checkConnection(resourceTrackerRpcAddr)) { + resourceTrackerRpcAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(RESOURCE_TRACKER_RPC_ADDRESS)); + } + + return resourceTrackerRpcAddr; + } + + @Override + public InetSocketAddress getCatalogAddress() { + if (!checkConnection(catalogAddr)) { + catalogAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(CATALOG_ADDRESS)); + } + + return catalogAddr; + } + + @Override + public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException { + if (!checkConnection(masterHttpInfoAddr)) { + masterHttpInfoAddr = NetUtils.createSocketAddr(getAddressElements(conf).get(MASTER_HTTP_INFO)); + } + + return masterHttpInfoAddr; + } + + /** + * Reads a text file stored in HDFS file, and then return all service addresses read from a HDFS file. * + * + * @param conf + * @return all service addresses + * @throws ServiceTrackerException + */ + private static List<String> getAddressElements(TajoConf conf) throws ServiceTrackerException { + + try { + FileSystem fs = getFileSystem(conf); + Path activeMasterBaseDir = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + + if (!fs.exists(activeMasterBaseDir)) { + throw new ServiceTrackerException("No such active master base path: " + activeMasterBaseDir); + } + if (!fs.isDirectory(activeMasterBaseDir)) { + throw new ServiceTrackerException("Active master base path must be a directory."); + } + + FileStatus[] files = fs.listStatus(activeMasterBaseDir); + + if (files.length < 1) { + throw new ServiceTrackerException("No active master entry"); + } else if (files.length > 1) { + throw new ServiceTrackerException("Two or more than active master entries."); + } + + // We can ensure that there is only one file due to the above assertion. + Path activeMasterEntry = files[0].getPath(); + + if (!fs.isFile(activeMasterEntry)) { + throw new ServiceTrackerException("Active master entry must be a file, but it is a directory."); + } + + List<String> addressElements = TUtil.newList(); + + addressElements.add(activeMasterEntry.getName().replaceAll("_", ":")); // Add UMBILICAL_RPC_ADDRESS to elements + + FSDataInputStream stream = fs.open(activeMasterEntry); + String data = stream.readUTF(); + stream.close(); + + addressElements.addAll(TUtil.newList(data.split("_"))); // Add remains entries to elements + + // ensure the number of entries + Preconditions.checkState(addressElements.size() == 5, "Fewer service addresses than necessary."); + + return addressElements; + + } catch (Throwable t) { + throw new ServiceTrackerException(t); + } + } + + + public static boolean isMasterAlive(InetSocketAddress masterAddress, TajoConf conf) { + return isMasterAlive(org.apache.tajo.util.NetUtils.normalizeInetSocketAddress(masterAddress), conf); + } + + public static boolean isMasterAlive(String masterName, TajoConf conf) { + boolean isAlive = true; + + try { + // how to create sockets + SocketFactory socketFactory = org.apache.hadoop.net.NetUtils.getDefaultSocketFactory(conf); + + int connectionTimeout = conf.getInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_KEY, + CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); + + InetSocketAddress server = org.apache.hadoop.net.NetUtils.createSocketAddr(masterName); + + // connected socket + Socket socket = socketFactory.createSocket(); + org.apache.hadoop.net.NetUtils.connect(socket, server, connectionTimeout); + } catch (Exception e) { + isAlive = false; + } + return isAlive; + } + + public static int getState(String masterName, TajoConf conf) { + String targetMaster = masterName.replaceAll(":", "_"); + int retValue = -1; + + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + + Path temPath = null; + + // Check backup masters + FileStatus[] files = fs.listStatus(backupPath); + for (FileStatus status : files) { + temPath = status.getPath(); + if (temPath.getName().equals(targetMaster)) { + return 0; + } + } + + // Check active master + files = fs.listStatus(activePath); + if (files.length == 1) { + temPath = files[0].getPath(); + if (temPath.getName().equals(targetMaster)) { + return 1; + } + } + retValue = -2; + } catch (Exception e) { + e.printStackTrace(); + } + return retValue; + } + + public static int formatHA(TajoConf conf) { + int retValue = -1; + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + Path temPath = null; + + int aliveMasterCount = 0; + // Check backup masters + FileStatus[] files = fs.listStatus(backupPath); + for (FileStatus status : files) { + temPath = status.getPath(); + if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { + aliveMasterCount++; + } + } + + // Check active master + files = fs.listStatus(activePath); + if (files.length == 1) { + temPath = files[0].getPath(); + if (isMasterAlive(temPath.getName().replaceAll("_", ":"), conf)) { + aliveMasterCount++; + } + } + + // If there is any alive master, users can't format storage. + if (aliveMasterCount > 0) { + return 0; + } + + // delete ha path. + fs.delete(TajoConf.getSystemHADir(conf), true); + retValue = 1; + } catch (Exception e) { + e.printStackTrace(); + } + return retValue; + } + + + public static List<String> getMasters(TajoConf conf) { + List<String> list = new ArrayList<String>(); + + try { + FileSystem fs = getFileSystem(conf); + Path activePath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_ACTIVE_DIR_NAME); + Path backupPath = new Path(TajoConf.getSystemHADir(conf), TajoConstants.SYSTEM_HA_BACKUP_DIR_NAME); + Path temPath = null; + + // Check backup masters + FileStatus[] files = fs.listStatus(backupPath); + for (FileStatus status : files) { + temPath = status.getPath(); + list.add(temPath.getName().replaceAll("_", ":")); + } + + // Check active master + files = fs.listStatus(activePath); + if (files.length == 1) { + temPath = files[0].getPath(); + list.add(temPath.getName().replaceAll("_", ":")); + } + + } catch (Exception e) { + e.printStackTrace(); + } + return list; + } + + private static FileSystem getFileSystem(TajoConf conf) throws IOException { + Path rootPath = TajoConf.getTajoRootDir(conf); + return rootPath.getFileSystem(conf); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java b/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java deleted file mode 100644 index c6fdd40..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/ha/TajoMasterInfo.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.ha; - -import java.net.InetSocketAddress; - -public class TajoMasterInfo { - - private boolean available; - private boolean isActive; - - private InetSocketAddress tajoMasterAddress; - private InetSocketAddress tajoClientAddress; - private InetSocketAddress workerResourceTrackerAddr; - private InetSocketAddress catalogAddress; - private InetSocketAddress webServerAddress; - - public InetSocketAddress getTajoMasterAddress() { - return tajoMasterAddress; - } - - public void setTajoMasterAddress(InetSocketAddress tajoMasterAddress) { - this.tajoMasterAddress = tajoMasterAddress; - } - - public InetSocketAddress getTajoClientAddress() { - return tajoClientAddress; - } - - public void setTajoClientAddress(InetSocketAddress tajoClientAddress) { - this.tajoClientAddress = tajoClientAddress; - } - - public InetSocketAddress getWorkerResourceTrackerAddr() { - return workerResourceTrackerAddr; - } - - public void setWorkerResourceTrackerAddr(InetSocketAddress workerResourceTrackerAddr) { - this.workerResourceTrackerAddr = workerResourceTrackerAddr; - } - - public InetSocketAddress getCatalogAddress() { - return catalogAddress; - } - - public void setCatalogAddress(InetSocketAddress catalogAddress) { - this.catalogAddress = catalogAddress; - } - - public InetSocketAddress getWebServerAddress() { - return webServerAddress; - } - - public void setWebServerAddress(InetSocketAddress webServerAddress) { - this.webServerAddress = webServerAddress; - } - - public boolean isAvailable() { - return available; - } - - public void setAvailable(boolean available) { - this.available = available; - } - - public boolean isActive() { - return isActive; - } - - public void setActive(boolean active) { - isActive = active; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 42ffd87..996d356 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -38,6 +38,8 @@ import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.worker.TajoWorker; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -45,6 +47,7 @@ import java.util.List; public class TajoContainerProxy extends ContainerProxy { private final QueryContext queryContext; + private final TajoWorker.WorkerContext workerContext; private final String planJson; public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context, @@ -52,6 +55,7 @@ public class TajoContainerProxy extends ContainerProxy { QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) { super(context, conf, executionBlockId, container); this.queryContext = queryContext; + this.workerContext = context.getQueryMasterContext().getWorkerContext(); this.planJson = planJson; } @@ -171,27 +175,8 @@ public class TajoContainerProxy extends ContainerProxy { RpcConnectionPool connPool = RpcConnectionPool.getPool(); NettyClientBase tmClient = null; try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - TajoConf conf = context.getConf(); - if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } catch (Exception e) { - context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(conf)); - context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(conf)); - tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - QueryCoordinatorProtocol.class, true); - } + ServiceTracker serviceTracker = context.getQueryMasterContext().getWorkerContext().getServiceTracker(); + tmClient = connPool.getConnection(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true); QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.releaseWorkerResource(null, http://git-wip-us.apache.org/repos/asf/tajo/blob/4595375f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 786025a..a11606f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -40,18 +40,18 @@ import org.apache.tajo.catalog.LocalCatalogWrapper; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.function.FunctionLoader; -import org.apache.tajo.ha.HAService; -import org.apache.tajo.ha.HAServiceHDFSImpl; -import org.apache.tajo.metrics.CatalogMetricsGaugeSet; -import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.rm.WorkerResourceManager; -import org.apache.tajo.session.SessionManager; +import org.apache.tajo.metrics.CatalogMetricsGaugeSet; +import org.apache.tajo.metrics.WorkerResourceMetricsGaugeSet; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rule.EvaluationContext; import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; +import org.apache.tajo.session.SessionManager; import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.util.*; @@ -127,7 +127,7 @@ public class TajoMaster extends CompositeService { private TajoSystemMetrics systemMetrics; - private HAService haService; + private ServiceTracker haService; private JvmPauseMonitor pauseMonitor; @@ -226,15 +226,6 @@ public class TajoMaster extends CompositeService { } } - - private void initHAManger() throws Exception { - // If tajo provides haService based on ZooKeeper, following codes need to update. - if (systemConf.getBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE)) { - haService = new HAServiceHDFSImpl(context); - haService.register(); - } - } - public boolean isActiveMaster() { return (haService != null ? haService.isActiveStatus() : true); } @@ -326,11 +317,8 @@ public class TajoMaster extends CompositeService { initSystemMetrics(); - try { - initHAManger(); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } + haService = ServiceTrackerFactory.get(systemConf); + haService.register(); historyWriter = new HistoryWriter(getMasterName(), true); historyWriter.init(getConfig()); @@ -477,7 +465,7 @@ public class TajoMaster extends CompositeService { return systemMetrics; } - public HAService getHAService() { + public ServiceTracker getHAService() { return haService; }
