Author: smohanty
Date: Sat Apr 13 04:31:57 2013
New Revision: 1467544
URL: http://svn.apache.org/r1467544
Log:
AMBARI-1926. One HBase master should have active HA status at all time.
(smohanty)
Modified:
incubator/ambari/trunk/CHANGES.txt
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
Modified: incubator/ambari/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1467544&r1=1467543&r2=1467544&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Sat Apr 13 04:31:57 2013
@@ -713,6 +713,9 @@ Trunk (unreleased changes):
BUG FIXES
+ AMBARI-1926. One HBase master should have active HA status at all time.
+ (smohanty)
+
AMBARI-1925. Remove "hadoop_deploy" user. (smohanty)
AMBARI-1915. Client install tasks are shown twice in install progress
Modified:
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java?rev=1467544&r1=1467543&r2=1467544&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
Sat Apr 13 04:31:57 2013
@@ -21,11 +21,13 @@ import com.google.inject.Singleton;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.state.Cluster;
@@ -45,11 +47,72 @@ public class HBaseMasterPortScanner impl
private static Log LOG = LogFactory.getLog(HBaseMasterPortScanner.class);
private Thread schedulerThread = null;
private final Object wakeupSyncObject = new Object();
- private int scanTimeoutMsc = 300;
+ private int defaultScanTimeoutMsc = 300;
+ private int scanTimeoutMsc = defaultScanTimeoutMsc;
+ private int testScanTimeoutMsc;
+ private int rescanTimeoutMsc = 60000;
private final int port = 60010;
- private Set<ServiceComponentHost> componentHostSet;
+ private int maxAttempts = 3;
+ private int attempts = 0;
+ private int countAttempts = 0;
+ private Map<ServiceComponentHost,Boolean> componentHostMap;
+ private Cluster currentCluster;
+ private Timer scheduleTimer;
+ private RescanSchedulerTask rescanSchedulerTask;
@Inject
private Clusters clusters;
+
+ /**
+ *
+ * @param defaultScanTimeoutMsc set default timeout for port scan
+ */
+ public void setDefaultScanTimeoutMsc(int defaultScanTimeoutMsc) {
+ this.defaultScanTimeoutMsc = defaultScanTimeoutMsc;
+ this.scanTimeoutMsc = this.defaultScanTimeoutMsc;
+ }
+
+
+ /**
+ *
+ * @param maxAttempts set maximum attempts to scan
+ */
+ public void setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ }
+
+ /**
+ *
+ * @param rescanTimeoutMsc timeout for latter rescan
+ */
+ public void setRescanTimeoutMsc(int rescanTimeoutMsc) {
+ this.rescanTimeoutMsc = rescanTimeoutMsc;
+ }
+
+ /**
+ *
+ * @return tested value (need unitests)
+ */
+ public int getTestScanTimeoutMsc() {
+ return testScanTimeoutMsc;
+ }
+
+ /**
+ *
+ * @return count attempts (need unitests)
+ */
+ public int getCountAttempts() {
+ return countAttempts;
+ }
+
+ /**
+ *
+ * @return task for latter scan
+ */
+ public RescanSchedulerTask getRescanSchedulerTask() {
+ return rescanSchedulerTask;
+ }
+
+
/**
* true if scanner should run ASAP. We need this flag to avoid sleep in
* situations, when we receive updateHBaseMaster request during running a
@@ -58,11 +121,13 @@ public class HBaseMasterPortScanner impl
private boolean activeAwakeRequest = false;
public HBaseMasterPortScanner(int scanTimeoutMsc) {
+ this.defaultScanTimeoutMsc = scanTimeoutMsc;
this.scanTimeoutMsc = scanTimeoutMsc;
this.start();
}
public HBaseMasterPortScanner() {
+ scheduleTimer = new Timer();
this.start();
}
@@ -70,7 +135,7 @@ public class HBaseMasterPortScanner impl
schedulerThread = new Thread(this, this.getClass().getSimpleName());
schedulerThread.start();
if (LOG.isDebugEnabled()) {
- LOG.debug("HBaseMasterPortScaner started");
+ LOG.debug("HBaseMasterPortScanner started");
}
}
@@ -84,20 +149,17 @@ public class HBaseMasterPortScanner impl
* method is guaranteed to return quickly.
*/
public void updateHBaseMaster(Cluster cluster) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("HBaseMasterPortScaner start scanning for cluster " +
cluster.getClusterName());
- }
synchronized (wakeupSyncObject) {
collectServiceComponentHostsForCluster(cluster);
- activeAwakeRequest = true;
- wakeupSyncObject.notify();
+ if(!componentHostMap.isEmpty()){
+ LOG.debug("HBaseMasterPortScanner start scanning for cluster " +
cluster.getClusterName());
+ activeAwakeRequest = true;
+ wakeupSyncObject.notify();
+ } else LOG.debug("No for scan (with HBaseMaster component)");
}
}
public void updateHBaseMaster(Host host) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("HBaseMasterPortScaner start scanning for Host " +
host.getHostName());
- }
synchronized (wakeupSyncObject) {
Set<Cluster> clustersSet;
try {
@@ -109,15 +171,15 @@ public class HBaseMasterPortScanner impl
while (iter.hasNext()) {
collectServiceComponentHostsForCluster(iter.next());
}
- activeAwakeRequest = true;
- wakeupSyncObject.notify();
+ if(!componentHostMap.isEmpty()){
+ LOG.debug("HBaseMasterPortScanner start scanning for Host " +
host.getHostName());
+ activeAwakeRequest = true;
+ wakeupSyncObject.notify();
+ } else LOG.debug("No for scan (with HBaseMaster component)");
}
}
public void updateHBaseMaster(ServiceComponentHost host) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("HBaseMasterPortScaner start scanning for ServiceComponentHost
" + host.getServiceComponentName());
- }
synchronized (wakeupSyncObject) {
try {
collectServiceComponentHostsForCluster(clusters.getCluster(host.getClusterName()));
@@ -125,26 +187,30 @@ public class HBaseMasterPortScanner impl
LOG.warn(ex);
return;
}
- activeAwakeRequest = true;
- wakeupSyncObject.notify();
+ if(!componentHostMap.isEmpty()){
+ LOG.debug("HBaseMasterPortScanner start scanning for
ServiceComponentHost " + host.getServiceComponentName());
+ activeAwakeRequest = true;
+ wakeupSyncObject.notify();
+ } else LOG.debug("No for scan (with HBaseMaster component)");
}
}
private void collectServiceComponentHostsForCluster(Cluster cluster) {
- componentHostSet = new HashSet<ServiceComponentHost>();
+ currentCluster = cluster;
+ componentHostMap = new HashMap<ServiceComponentHost, Boolean>();
Map<String, Host> hosts = null;
try {
- hosts = clusters.getHostsForCluster(cluster.getClusterName());
+ hosts = clusters.getHostsForCluster(currentCluster.getClusterName());
} catch (AmbariException ex) {
LOG.warn(ex);
return;
}
for (Map.Entry<String, Host> entry : hosts.entrySet()) {
if (entry.getValue() != null) {
- List<ServiceComponentHost> componentHosts =
cluster.getServiceComponentHosts(entry.getValue().getHostName());
+ List<ServiceComponentHost> componentHosts =
currentCluster.getServiceComponentHosts(entry.getValue().getHostName());
for (ServiceComponentHost componentHost : componentHosts) {
if (componentHost != null && componentHost.getServiceComponentName()
!= null &&
componentHost.getServiceComponentName().equals(Role.HBASE_MASTER.toString())) {
- componentHostSet.add(componentHost);
+ componentHostMap.put(componentHost, false);
}
}
}
@@ -155,22 +221,49 @@ public class HBaseMasterPortScanner impl
@Override
public void run() {
while (true) {
+ if(rescanSchedulerTask != null){
+ rescanSchedulerTask.cancel();
+ scheduleTimer.purge();
+ }
activeAwakeRequest = false;
- if (componentHostSet != null) {
- Iterator<ServiceComponentHost> iter = componentHostSet.iterator();
- while (iter.hasNext()) {
- ServiceComponentHost componentHost = iter.next();
- boolean active =
- scan(componentHost.getHostName());
- componentHost.setHAState((active) ? "active" : "passive");
-
+ if (componentHostMap != null) {
+ for (Map.Entry<ServiceComponentHost, Boolean> entry :
componentHostMap.entrySet()) {
+ entry.setValue(scan(entry.getKey().getHostName()));
if (schedulerThread.isInterrupted()) {
+ scanTimeoutMsc = defaultScanTimeoutMsc;
return;
}
if (activeAwakeRequest) {
+ scanTimeoutMsc = defaultScanTimeoutMsc;
+ attempts = 0;
break;
}
}
+ attempts++;
+ countAttempts = attempts;
+ LOG.info("Attempt to scan of HBASE_MASTER port : "+ attempts);
+ if(validateScanResults(componentHostMap)){
+ //If results valid set it to ServiceComponentHost
+ setScanResults(componentHostMap);
+ scanTimeoutMsc = defaultScanTimeoutMsc;
+ attempts = 0;
+ } else {
+ if(attempts <= maxAttempts){
+ //Increase timeout
+ scanTimeoutMsc += defaultScanTimeoutMsc;
+ testScanTimeoutMsc = scanTimeoutMsc;
+ LOG.info("Increase timeout for scan HBASE_MASTER port to : "+
scanTimeoutMsc);
+ activeAwakeRequest = true;
+ } else {
+ LOG.info("No valid data about HBASE_MASTER, ports will rescanned
after "+rescanTimeoutMsc/1000 + " seconds");
+ scanTimeoutMsc = defaultScanTimeoutMsc;
+ attempts = 0;
+ //Create task for latter scan
+ rescanSchedulerTask = new RescanSchedulerTask(currentCluster);
+ scheduleTimer.schedule(rescanSchedulerTask, rescanTimeoutMsc);
+ }
+ }
+
}
if (activeAwakeRequest) {
activeAwakeRequest = false;
@@ -186,6 +279,29 @@ public class HBaseMasterPortScanner impl
}
}
+ private void setScanResults(Map<ServiceComponentHost, Boolean> scanResuls){
+ for (Map.Entry<ServiceComponentHost, Boolean> entry :
scanResuls.entrySet()) {
+ entry.getKey().setHAState((entry.getValue()) ? "active" : "passive");
+ }
+ LOG.info("Set result of HBASE_MASTER scan");
+ }
+
+ private boolean validateScanResults(Map<ServiceComponentHost, Boolean>
scanResuls){
+ boolean res = false;
+ int activeMasters = 0;
+ for (Map.Entry<ServiceComponentHost, Boolean> entry :
scanResuls.entrySet()) {
+ activeMasters += (entry.getValue()) ? 1 : 0;
+ }
+ if(activeMasters == 0 || activeMasters > 1) {
+ res = false;
+ }
+ else {
+ res = true;
+ }
+ LOG.info("Results of HBASE_MASTER scan are "+ ((res) ? "valid" :
"invalid"));
+ return res;
+ }
+
private boolean scan(String hostname) {
try {
Socket socket = new Socket();
@@ -202,4 +318,21 @@ public class HBaseMasterPortScanner impl
return false;
}
}
+
+ private class RescanSchedulerTask extends TimerTask {
+
+ private Cluster cl;
+
+ public RescanSchedulerTask(Cluster cl) {
+ this.cl = cl;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Start scheduled rescan of HBASE_MASTER ports for cluster "+
cl.getClusterName());
+ updateHBaseMaster(cl);
+ }
+
+ }
+
}
Modified:
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
URL:
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java?rev=1467544&r1=1467543&r2=1467544&view=diff
==============================================================================
---
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
(original)
+++
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
Sat Apr 13 04:31:57 2013
@@ -66,30 +66,51 @@ public class HBaseMasterPortScannerTest
private static Cluster cluster;
private static Host host;
private static ServiceComponentHost serviceComponentHost;
+ private static int scanTimeOut = 100;
+ private static int reScanTimeOut = 3000;
+ private static int maxAttempts = 2;
public HBaseMasterPortScannerTest() {
}
+ private static void setUpPortState(boolean open) {
+ if (open) {
+ if (serverSocket == null || serverSocket.isClosed()) {
+ try {
+ serverSocket = new ServerSocket(60010);
+ } catch (IOException e) {
+ try {
+ serverSocket.close();
+ } catch (IOException ex) {
+ log.debug("Could not close on port: 60010");
+ log.error(ex.getMessage());
+ }
+ log.error("Could not listen on port: 60010");
+ }
+ }
+ } else {
+ if (serverSocket != null && !serverSocket.isClosed()) {
+ try {
+ serverSocket.close();
+ serverSocket = null;
+ } catch (IOException ex) {
+ log.debug("Could not close on port: 60010");
+ log.error(ex.getMessage());
+ }
+ }
+ }
+ }
+
+
@BeforeClass
public static void setUpClass() throws Exception {
injector = Guice.createInjector(new InMemoryDefaultTestModule());
injector.getInstance(GuiceJpaInitializer.class);
hostnames = new ArrayList<String>();
- hostnames.add("localhost");
- hostnames.add("localhost1");
- hostnames.add("localhost2");
- hostnames.add("localhost3");
- try {
- serverSocket = new ServerSocket(60010);
- } catch (IOException e) {
- try {
- serverSocket.close();
- } catch (IOException ex) {
- log.debug("Could not close on port: 60010");
- log.error(ex.getMessage());
- }
- log.error("Could not listen on port: 60010");
- }
+ hostnames.add("127.0.0.1");
+ hostnames.add("host1");
+ hostnames.add("host2");
+ hostnames.add("host3");
scaner = injector.getInstance(HBaseMasterPortScanner.class);
clusters = injector.getInstance(Clusters.class);
metaInfo = injector.getInstance(AmbariMetaInfo.class);
@@ -107,7 +128,7 @@ public class HBaseMasterPortScannerTest
hostObject.setIPv6("ipv6");
hostObject.setOsType(DummyOsType);
hostNamesSet.add(hostname);
- if (hostname.equals("localhost")) {
+ if (hostname.equals("127.0.0.1")) {
host = hostObject;
}
}
@@ -115,9 +136,9 @@ public class HBaseMasterPortScannerTest
Service service = cluster.addService(HDFS);
service.persist();
service.addServiceComponent(NAMENODE).persist();
-
service.getServiceComponent(NAMENODE).addServiceComponentHost("localhost").persist();
+
service.getServiceComponent(NAMENODE).addServiceComponentHost("127.0.0.1").persist();
service.addServiceComponent(DATANODE).persist();
-
service.getServiceComponent(DATANODE).addServiceComponentHost("localhost").persist();
+
service.getServiceComponent(DATANODE).addServiceComponentHost("127.0.0.1").persist();
service = serviceFactory.createNew(cluster, "HBASE");
cluster.addService(service);
service.persist();
@@ -126,17 +147,16 @@ public class HBaseMasterPortScannerTest
service.persist();
for (String hostname : hostnames) {
service.getServiceComponent(HBASE_MASTER).addServiceComponentHost(hostname).persist();
- if (hostname.equals("localhost")) {
+ if (hostname.equals("127.0.0.1")) {
serviceComponentHost =
service.getServiceComponent(HBASE_MASTER).getServiceComponentHost(hostname);
}
}
-
}
@AfterClass
public static void tearDownUpClass() {
try {
- serverSocket.close();
+ if(serverSocket!=null) serverSocket.close();
} catch (IOException ex) {
log.debug("Could not close on port: 60010");
log.error(ex.getMessage());
@@ -145,7 +165,7 @@ public class HBaseMasterPortScannerTest
@Before
public void setUp() throws AmbariException, Exception {
- serviceComponentHost.convertToResponse().setHa_status("passive");
+ serviceComponentHost.setHAState("passive");
}
/**
@@ -153,9 +173,13 @@ public class HBaseMasterPortScannerTest
*/
@Test
public void testUpdateHBaseMaster_Cluster() throws InterruptedException {
+ setUpPortState(true);
+ scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+ scaner.setMaxAttempts(maxAttempts);
+ scaner.setRescanTimeoutMsc(reScanTimeOut);
log.debug("updateHBaseMaster - pass Cluster");
scaner.updateHBaseMaster(cluster);
- Thread.sleep(2000);
+ Thread.sleep(1000);
assertEquals("active",
serviceComponentHost.convertToResponse().getHa_status());
}
@@ -164,9 +188,13 @@ public class HBaseMasterPortScannerTest
*/
@Test
public void testUpdateHBaseMaster_Host() throws InterruptedException {
+ setUpPortState(true);
+ scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+ scaner.setMaxAttempts(maxAttempts);
+ scaner.setRescanTimeoutMsc(reScanTimeOut);
log.debug("updateHBaseMaster - pass Host");
scaner.updateHBaseMaster(host);
- Thread.sleep(2000);
+ Thread.sleep(1000);
assertEquals("active",
serviceComponentHost.convertToResponse().getHa_status());
}
@@ -175,9 +203,58 @@ public class HBaseMasterPortScannerTest
*/
@Test
public void testUpdateHBaseMaster_ServiceComponentHost() throws
InterruptedException {
+ setUpPortState(true);
+ scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+ scaner.setMaxAttempts(maxAttempts);
+ scaner.setRescanTimeoutMsc(reScanTimeOut);
log.debug("updateHBaseMaster - pass ServiceComponentHost");
scaner.updateHBaseMaster(serviceComponentHost);
- Thread.sleep(2000);
+ Thread.sleep(1000);
assertEquals("active",
serviceComponentHost.convertToResponse().getHa_status());
}
+
+ /**
+ * Test of multiple call of updateHBaseMaster method.
+ */
+ @Test
+ public void testMultipleCall() throws InterruptedException {
+ setUpPortState(true);
+ scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+ scaner.setMaxAttempts(maxAttempts);
+ scaner.setRescanTimeoutMsc(reScanTimeOut);
+ log.debug("updateHBaseMaster - pass ServiceComponentHost");
+ //Test if some call of updateHBaseMaster in short time
+ scaner.updateHBaseMaster(cluster);
+ scaner.updateHBaseMaster(host);
+ scaner.updateHBaseMaster(serviceComponentHost);
+ Thread.sleep(1000);
+ assertEquals("active",
serviceComponentHost.convertToResponse().getHa_status());
+ }
+
+ /**
+ * Test case of if port is closed or not enough scan timeout.
+ */
+ @Test
+ public void testOfBrokenMasterScenario() throws InterruptedException {
+ setUpPortState(false);
+ scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+ scaner.setMaxAttempts(maxAttempts);
+ scaner.setRescanTimeoutMsc(reScanTimeOut);
+ log.debug("testOfBrokenMasterScenario start");
+ scaner.updateHBaseMaster(cluster);
+ Thread.sleep(2000);
+ //Should not be active masters
+ assertEquals("passive",
serviceComponentHost.convertToResponse().getHa_status());
+ serviceComponentHost.setHAState("passive");
+ //Scanner should try to scan maxAttempts times
+ assertEquals(maxAttempts, scaner.getCountAttempts()-1);
+ //Timeout for scan shoul be scanTimeOut * scaner.getCountAttempts()
+ assertEquals(scanTimeOut * scaner.getCountAttempts(),
scaner.getTestScanTimeoutMsc());
+ //Task for latter scan shoul be created
+ assertNotNull(scaner.getRescanSchedulerTask());
+ setUpPortState(true);
+ Thread.sleep(3500);
+ //Test active masters after latter rescan
+ assertEquals("active",
serviceComponentHost.convertToResponse().getHa_status());
+ }
}
\ No newline at end of file