Author: smohanty
Date: Sat Apr 13 04:31:57 2013
New Revision: 1467544

URL: http://svn.apache.org/r1467544
Log:
AMBARI-1926. One HBase master should have active HA status at all time. 
(smohanty)

Modified:
    incubator/ambari/trunk/CHANGES.txt
    
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
    
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java

Modified: incubator/ambari/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/CHANGES.txt?rev=1467544&r1=1467543&r2=1467544&view=diff
==============================================================================
--- incubator/ambari/trunk/CHANGES.txt (original)
+++ incubator/ambari/trunk/CHANGES.txt Sat Apr 13 04:31:57 2013
@@ -713,6 +713,9 @@ Trunk (unreleased changes):
 
  BUG FIXES
 
+ AMBARI-1926. One HBase master should have active HA status at all time. 
+ (smohanty)
+
  AMBARI-1925. Remove "hadoop_deploy" user. (smohanty)
 
  AMBARI-1915. Client install tasks are shown twice in install progress 

Modified: 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java?rev=1467544&r1=1467543&r2=1467544&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScanner.java
 Sat Apr 13 04:31:57 2013
@@ -21,11 +21,13 @@ import com.google.inject.Singleton;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.state.Cluster;
@@ -45,11 +47,72 @@ public class HBaseMasterPortScanner impl
   private static Log LOG = LogFactory.getLog(HBaseMasterPortScanner.class);
   private Thread schedulerThread = null;
   private final Object wakeupSyncObject = new Object();
-  private int scanTimeoutMsc = 300;
+  private int defaultScanTimeoutMsc = 300;
+  private int scanTimeoutMsc = defaultScanTimeoutMsc;
+  private int testScanTimeoutMsc;
+  private int rescanTimeoutMsc = 60000;
   private final int port = 60010;
-  private Set<ServiceComponentHost> componentHostSet;
+  private int maxAttempts = 3;
+  private int attempts = 0;
+  private int countAttempts = 0;
+  private Map<ServiceComponentHost,Boolean> componentHostMap;
+  private Cluster currentCluster;
+  private Timer scheduleTimer;
+  private RescanSchedulerTask rescanSchedulerTask;
   @Inject
   private Clusters clusters;
+
+  /**
+   * 
+   * @param defaultScanTimeoutMsc set default timeout for port scan
+   */
+  public void setDefaultScanTimeoutMsc(int defaultScanTimeoutMsc) {
+    this.defaultScanTimeoutMsc = defaultScanTimeoutMsc;
+    this.scanTimeoutMsc = this.defaultScanTimeoutMsc;
+  }
+
+
+  /**
+   * 
+   * @param maxAttempts set maximum attempts to scan
+   */
+  public void setMaxAttempts(int maxAttempts) {
+    this.maxAttempts = maxAttempts;
+  }
+
+  /**
+   * 
+   * @param rescanTimeoutMsc timeout for latter rescan
+   */
+  public void setRescanTimeoutMsc(int rescanTimeoutMsc) {
+    this.rescanTimeoutMsc = rescanTimeoutMsc;
+  }
+
+  /**
+   * 
+   * @return tested value (need unitests)
+   */
+  public int getTestScanTimeoutMsc() {
+    return testScanTimeoutMsc;
+  }
+
+  /**
+   * 
+   * @return count attempts (need unitests)
+   */
+  public int getCountAttempts() {
+    return countAttempts;
+  }
+
+  /**
+   * 
+   * @return task for latter scan
+   */
+  public RescanSchedulerTask getRescanSchedulerTask() {
+    return rescanSchedulerTask;
+  }
+  
+  
   /**
    * true if scanner should run ASAP. We need this flag to avoid sleep in
    * situations, when we receive updateHBaseMaster request during running a
@@ -58,11 +121,13 @@ public class HBaseMasterPortScanner impl
   private boolean activeAwakeRequest = false;
 
   public HBaseMasterPortScanner(int scanTimeoutMsc) {
+    this.defaultScanTimeoutMsc = scanTimeoutMsc;
     this.scanTimeoutMsc = scanTimeoutMsc;
     this.start();
   }
 
   public HBaseMasterPortScanner() {
+     scheduleTimer = new Timer();
      this.start();
   }
   
@@ -70,7 +135,7 @@ public class HBaseMasterPortScanner impl
     schedulerThread = new Thread(this, this.getClass().getSimpleName());
     schedulerThread.start();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("HBaseMasterPortScaner started");
+      LOG.debug("HBaseMasterPortScanner started");
     }
   }
 
@@ -84,20 +149,17 @@ public class HBaseMasterPortScanner impl
    * method is guaranteed to return quickly.
    */
   public void updateHBaseMaster(Cluster cluster) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("HBaseMasterPortScaner start scanning for cluster " + 
cluster.getClusterName());
-    }
     synchronized (wakeupSyncObject) {
       collectServiceComponentHostsForCluster(cluster);
-      activeAwakeRequest = true;
-      wakeupSyncObject.notify();
+      if(!componentHostMap.isEmpty()){
+        LOG.debug("HBaseMasterPortScanner start scanning for cluster " + 
cluster.getClusterName());
+        activeAwakeRequest = true;
+        wakeupSyncObject.notify();
+      } else LOG.debug("No for scan (with HBaseMaster component)");
     }
   }
 
   public void updateHBaseMaster(Host host) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("HBaseMasterPortScaner start scanning for Host " + 
host.getHostName());
-    }
     synchronized (wakeupSyncObject) {
       Set<Cluster> clustersSet;
       try {
@@ -109,15 +171,15 @@ public class HBaseMasterPortScanner impl
       while (iter.hasNext()) {
         collectServiceComponentHostsForCluster(iter.next());
       }
-      activeAwakeRequest = true;
-      wakeupSyncObject.notify();
+      if(!componentHostMap.isEmpty()){
+        LOG.debug("HBaseMasterPortScanner start scanning for Host " + 
host.getHostName());
+        activeAwakeRequest = true;
+        wakeupSyncObject.notify();
+      } else LOG.debug("No for scan (with HBaseMaster component)");
     }
   }
 
   public void updateHBaseMaster(ServiceComponentHost host) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("HBaseMasterPortScaner start scanning for ServiceComponentHost 
" + host.getServiceComponentName());
-    }
     synchronized (wakeupSyncObject) {
       try {
         
collectServiceComponentHostsForCluster(clusters.getCluster(host.getClusterName()));
@@ -125,26 +187,30 @@ public class HBaseMasterPortScanner impl
         LOG.warn(ex);
         return;
       }
-      activeAwakeRequest = true;
-      wakeupSyncObject.notify();
+      if(!componentHostMap.isEmpty()){
+        LOG.debug("HBaseMasterPortScanner start scanning for 
ServiceComponentHost " + host.getServiceComponentName());
+        activeAwakeRequest = true;
+        wakeupSyncObject.notify();
+      } else LOG.debug("No for scan (with HBaseMaster component)");
     }
   }
 
   private void collectServiceComponentHostsForCluster(Cluster cluster) {
-    componentHostSet = new HashSet<ServiceComponentHost>();
+    currentCluster = cluster;
+    componentHostMap = new HashMap<ServiceComponentHost, Boolean>();
     Map<String, Host> hosts = null;
     try {
-      hosts = clusters.getHostsForCluster(cluster.getClusterName());
+      hosts = clusters.getHostsForCluster(currentCluster.getClusterName());
     } catch (AmbariException ex) {
       LOG.warn(ex);
       return;
     }
     for (Map.Entry<String, Host> entry : hosts.entrySet()) {
       if (entry.getValue() != null) {
-        List<ServiceComponentHost> componentHosts = 
cluster.getServiceComponentHosts(entry.getValue().getHostName());
+        List<ServiceComponentHost> componentHosts = 
currentCluster.getServiceComponentHosts(entry.getValue().getHostName());
         for (ServiceComponentHost componentHost : componentHosts) {
           if (componentHost != null && componentHost.getServiceComponentName() 
!= null && 
componentHost.getServiceComponentName().equals(Role.HBASE_MASTER.toString())) {
-            componentHostSet.add(componentHost);
+            componentHostMap.put(componentHost, false);
           }
         }
       }
@@ -155,22 +221,49 @@ public class HBaseMasterPortScanner impl
   @Override
   public void run() {
     while (true) {
+      if(rescanSchedulerTask != null){
+        rescanSchedulerTask.cancel();
+        scheduleTimer.purge();
+      }          
       activeAwakeRequest = false;
-      if (componentHostSet != null) {
-        Iterator<ServiceComponentHost> iter = componentHostSet.iterator();
-        while (iter.hasNext()) {
-          ServiceComponentHost componentHost = iter.next();
-          boolean active =
-                  scan(componentHost.getHostName());
-          componentHost.setHAState((active) ? "active" : "passive");
-
+      if (componentHostMap != null) {
+        for (Map.Entry<ServiceComponentHost, Boolean> entry : 
componentHostMap.entrySet()) {
+          entry.setValue(scan(entry.getKey().getHostName()));
           if (schedulerThread.isInterrupted()) {
+            scanTimeoutMsc = defaultScanTimeoutMsc;
             return;
           }
           if (activeAwakeRequest) {
+            scanTimeoutMsc = defaultScanTimeoutMsc;
+            attempts = 0;
             break;
           }
         }
+        attempts++;
+        countAttempts = attempts;
+        LOG.info("Attempt to scan of HBASE_MASTER port : "+ attempts);
+        if(validateScanResults(componentHostMap)){
+          //If results valid set it to ServiceComponentHost
+          setScanResults(componentHostMap);
+          scanTimeoutMsc = defaultScanTimeoutMsc;
+          attempts = 0;
+        } else {
+          if(attempts <= maxAttempts){
+            //Increase timeout
+            scanTimeoutMsc += defaultScanTimeoutMsc;
+            testScanTimeoutMsc = scanTimeoutMsc;
+            LOG.info("Increase timeout for scan HBASE_MASTER port to : "+ 
scanTimeoutMsc);
+            activeAwakeRequest = true;
+          } else {
+            LOG.info("No valid data about HBASE_MASTER, ports will rescanned 
after "+rescanTimeoutMsc/1000 + " seconds");
+            scanTimeoutMsc = defaultScanTimeoutMsc;
+            attempts = 0;
+            //Create task for latter scan
+            rescanSchedulerTask = new RescanSchedulerTask(currentCluster);
+            scheduleTimer.schedule(rescanSchedulerTask, rescanTimeoutMsc);
+          }
+        }        
+        
       }
       if (activeAwakeRequest) {
         activeAwakeRequest = false;
@@ -186,6 +279,29 @@ public class HBaseMasterPortScanner impl
     }
   }
 
+  private void setScanResults(Map<ServiceComponentHost, Boolean> scanResuls){
+    for (Map.Entry<ServiceComponentHost, Boolean> entry : 
scanResuls.entrySet()) {
+      entry.getKey().setHAState((entry.getValue()) ? "active" : "passive");
+    }
+    LOG.info("Set result of HBASE_MASTER scan");
+  }
+  
+  private boolean validateScanResults(Map<ServiceComponentHost, Boolean> 
scanResuls){
+    boolean res = false;
+    int activeMasters = 0;
+    for (Map.Entry<ServiceComponentHost, Boolean> entry : 
scanResuls.entrySet()) {
+      activeMasters += (entry.getValue()) ? 1 : 0;
+    }
+    if(activeMasters == 0 || activeMasters > 1) {
+      res = false;
+    }
+    else {
+      res = true;
+    } 
+    LOG.info("Results of HBASE_MASTER scan are "+ ((res) ? "valid" : 
"invalid"));
+    return res;  
+  }
+  
   private boolean scan(String hostname) {
     try {
       Socket socket = new Socket();
@@ -202,4 +318,21 @@ public class HBaseMasterPortScanner impl
       return false;
     }
   }
+  
+  private class RescanSchedulerTask  extends TimerTask  {
+
+    private Cluster cl;
+
+    public RescanSchedulerTask(Cluster cl) {
+      this.cl = cl;
+    }
+    
+    @Override
+    public void run() {
+      LOG.info("Start scheduled rescan of HBASE_MASTER ports for cluster "+ 
cl.getClusterName());
+      updateHBaseMaster(cl);
+    }
+    
+  }
+   
 }

Modified: 
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java?rev=1467544&r1=1467543&r2=1467544&view=diff
==============================================================================
--- 
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
 (original)
+++ 
incubator/ambari/trunk/ambari-server/src/test/java/org/apache/ambari/server/state/svccomphost/HBaseMasterPortScannerTest.java
 Sat Apr 13 04:31:57 2013
@@ -66,30 +66,51 @@ public class HBaseMasterPortScannerTest 
   private static Cluster cluster;
   private static Host host;
   private static ServiceComponentHost serviceComponentHost;
+  private static int scanTimeOut = 100; 
+  private static int reScanTimeOut = 3000;
+  private static int maxAttempts = 2;
 
   public HBaseMasterPortScannerTest() {
   }
 
+  private static void setUpPortState(boolean open) {
+    if (open) {
+      if (serverSocket == null || serverSocket.isClosed()) {
+        try {
+          serverSocket = new ServerSocket(60010);
+        } catch (IOException e) {
+          try {
+            serverSocket.close();
+          } catch (IOException ex) {
+            log.debug("Could not close on port: 60010");
+            log.error(ex.getMessage());
+          }
+          log.error("Could not listen on port: 60010");
+        }
+      }
+    } else {
+      if (serverSocket != null && !serverSocket.isClosed()) {
+        try {
+          serverSocket.close();
+          serverSocket = null;
+        } catch (IOException ex) {
+          log.debug("Could not close on port: 60010");
+          log.error(ex.getMessage());
+        }
+      }
+    }
+  }
+  
+  
   @BeforeClass
   public static void setUpClass() throws Exception {
     injector = Guice.createInjector(new InMemoryDefaultTestModule());
     injector.getInstance(GuiceJpaInitializer.class);
     hostnames = new ArrayList<String>();
-    hostnames.add("localhost");
-    hostnames.add("localhost1");
-    hostnames.add("localhost2");
-    hostnames.add("localhost3");
-    try {
-      serverSocket = new ServerSocket(60010);
-    } catch (IOException e) {
-      try {
-        serverSocket.close();
-      } catch (IOException ex) {
-        log.debug("Could not close on port: 60010");
-        log.error(ex.getMessage());
-      }
-      log.error("Could not listen on port: 60010");
-    }
+    hostnames.add("127.0.0.1");
+    hostnames.add("host1");
+    hostnames.add("host2");
+    hostnames.add("host3");
     scaner = injector.getInstance(HBaseMasterPortScanner.class);
     clusters = injector.getInstance(Clusters.class);
     metaInfo = injector.getInstance(AmbariMetaInfo.class);
@@ -107,7 +128,7 @@ public class HBaseMasterPortScannerTest 
       hostObject.setIPv6("ipv6");
       hostObject.setOsType(DummyOsType);
       hostNamesSet.add(hostname);
-      if (hostname.equals("localhost")) {
+      if (hostname.equals("127.0.0.1")) {
         host = hostObject;
       }
     }
@@ -115,9 +136,9 @@ public class HBaseMasterPortScannerTest 
     Service service = cluster.addService(HDFS);
     service.persist();
     service.addServiceComponent(NAMENODE).persist();
-    
service.getServiceComponent(NAMENODE).addServiceComponentHost("localhost").persist();
+    
service.getServiceComponent(NAMENODE).addServiceComponentHost("127.0.0.1").persist();
     service.addServiceComponent(DATANODE).persist();
-    
service.getServiceComponent(DATANODE).addServiceComponentHost("localhost").persist();
+    
service.getServiceComponent(DATANODE).addServiceComponentHost("127.0.0.1").persist();
     service = serviceFactory.createNew(cluster, "HBASE");
     cluster.addService(service);
     service.persist();
@@ -126,17 +147,16 @@ public class HBaseMasterPortScannerTest 
     service.persist();
     for (String hostname : hostnames) {
       
service.getServiceComponent(HBASE_MASTER).addServiceComponentHost(hostname).persist();
-      if (hostname.equals("localhost")) {
+      if (hostname.equals("127.0.0.1")) {
         serviceComponentHost = 
service.getServiceComponent(HBASE_MASTER).getServiceComponentHost(hostname);
       }
     }
-
   }
 
   @AfterClass
   public static void tearDownUpClass() {
     try {
-      serverSocket.close();
+      if(serverSocket!=null) serverSocket.close();
     } catch (IOException ex) {
       log.debug("Could not close on port: 60010");
       log.error(ex.getMessage());
@@ -145,7 +165,7 @@ public class HBaseMasterPortScannerTest 
 
   @Before
   public void setUp() throws AmbariException, Exception {
-    serviceComponentHost.convertToResponse().setHa_status("passive");
+    serviceComponentHost.setHAState("passive");
   }
 
   /**
@@ -153,9 +173,13 @@ public class HBaseMasterPortScannerTest 
    */
   @Test
   public void testUpdateHBaseMaster_Cluster() throws InterruptedException {
+    setUpPortState(true);
+    scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+    scaner.setMaxAttempts(maxAttempts);
+    scaner.setRescanTimeoutMsc(reScanTimeOut);
     log.debug("updateHBaseMaster - pass Cluster");
     scaner.updateHBaseMaster(cluster);
-    Thread.sleep(2000);
+    Thread.sleep(1000);
     assertEquals("active", 
serviceComponentHost.convertToResponse().getHa_status());
   }
 
@@ -164,9 +188,13 @@ public class HBaseMasterPortScannerTest 
    */
   @Test
   public void testUpdateHBaseMaster_Host() throws InterruptedException {
+    setUpPortState(true);
+    scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+    scaner.setMaxAttempts(maxAttempts);
+    scaner.setRescanTimeoutMsc(reScanTimeOut);
     log.debug("updateHBaseMaster - pass Host");
     scaner.updateHBaseMaster(host);
-    Thread.sleep(2000);
+    Thread.sleep(1000);
     assertEquals("active", 
serviceComponentHost.convertToResponse().getHa_status());
   }
 
@@ -175,9 +203,58 @@ public class HBaseMasterPortScannerTest 
    */
   @Test
   public void testUpdateHBaseMaster_ServiceComponentHost() throws 
InterruptedException {
+    setUpPortState(true);
+    scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+    scaner.setMaxAttempts(maxAttempts);
+    scaner.setRescanTimeoutMsc(reScanTimeOut);    
     log.debug("updateHBaseMaster - pass ServiceComponentHost");
     scaner.updateHBaseMaster(serviceComponentHost);
-    Thread.sleep(2000);
+    Thread.sleep(1000);
     assertEquals("active", 
serviceComponentHost.convertToResponse().getHa_status());
   }
+
+  /**
+   * Test of multiple call of updateHBaseMaster method.
+   */
+  @Test
+  public void testMultipleCall() throws InterruptedException {
+    setUpPortState(true);
+    scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+    scaner.setMaxAttempts(maxAttempts);
+    scaner.setRescanTimeoutMsc(reScanTimeOut);    
+    log.debug("updateHBaseMaster - pass ServiceComponentHost");
+    //Test if some call of updateHBaseMaster in short time
+    scaner.updateHBaseMaster(cluster);
+    scaner.updateHBaseMaster(host);
+    scaner.updateHBaseMaster(serviceComponentHost);
+    Thread.sleep(1000);
+    assertEquals("active", 
serviceComponentHost.convertToResponse().getHa_status());
+  }  
+  
+  /**
+   * Test case of if port is closed or not enough scan timeout.
+   */
+  @Test
+  public void testOfBrokenMasterScenario() throws InterruptedException {
+    setUpPortState(false);
+    scaner.setDefaultScanTimeoutMsc(scanTimeOut);
+    scaner.setMaxAttempts(maxAttempts);
+    scaner.setRescanTimeoutMsc(reScanTimeOut);
+    log.debug("testOfBrokenMasterScenario start");
+    scaner.updateHBaseMaster(cluster);
+    Thread.sleep(2000);
+    //Should not be active masters
+    assertEquals("passive", 
serviceComponentHost.convertToResponse().getHa_status());
+    serviceComponentHost.setHAState("passive");
+    //Scanner should try to scan maxAttempts times
+    assertEquals(maxAttempts, scaner.getCountAttempts()-1);
+    //Timeout for scan shoul be scanTimeOut * scaner.getCountAttempts()
+    assertEquals(scanTimeOut * scaner.getCountAttempts(), 
scaner.getTestScanTimeoutMsc());
+    //Task for latter scan shoul be created
+    assertNotNull(scaner.getRescanSchedulerTask());
+    setUpPortState(true);
+    Thread.sleep(3500);
+    //Test active masters after latter rescan
+    assertEquals("active", 
serviceComponentHost.convertToResponse().getHa_status());
+  }  
 }
\ No newline at end of file


Reply via email to