Repository: trafodion
Updated Branches:
  refs/heads/master 0f025ac62 -> 622e67bef


TRAFODION-2844 add strategy to dcsserver for restart moxsrvr


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/abfd6d61
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/abfd6d61
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/abfd6d61

Branch: refs/heads/master
Commit: abfd6d6115149c7bbe2c13907b709d08f2384112
Parents: 7acdca6
Author: aven <[email protected]>
Authored: Sun Dec 17 15:26:28 2017 +0800
Committer: aven <[email protected]>
Committed: Sun Dec 17 15:26:28 2017 +0800

----------------------------------------------------------------------
 dcs/pom.xml                                     |  10 ++
 .../main/java/org/trafodion/dcs/Constants.java  |   6 +
 .../org/trafodion/dcs/server/ServerManager.java | 122 +++++++++----------
 .../org/trafodion/dcs/util/RetryCounter.java    | 114 +++++++++++------
 .../trafodion/dcs/util/RetryCounterFactory.java |   4 +
 dcs/src/main/resources/dcs-default.xml          |   7 ++
 6 files changed, 161 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/abfd6d61/dcs/pom.xml
----------------------------------------------------------------------
diff --git a/dcs/pom.xml b/dcs/pom.xml
index 7581991..206e424 100644
--- a/dcs/pom.xml
+++ b/dcs/pom.xml
@@ -124,6 +124,16 @@
   
   <plugins>
     <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>2.5.1</version>
+        <inherited>true</inherited>
+        <configuration>
+            <source>1.7</source>
+            <target>1.7</target>
+        </configuration>
+    </plugin>
+    <plugin>
       <groupId>org.codehaus.mojo</groupId>
       <artifactId>properties-maven-plugin</artifactId>
       <version>1.0-alpha-2</version>

http://git-wip-us.apache.org/repos/asf/trafodion/blob/abfd6d61/dcs/src/main/java/org/trafodion/dcs/Constants.java
----------------------------------------------------------------------
diff --git a/dcs/src/main/java/org/trafodion/dcs/Constants.java 
b/dcs/src/main/java/org/trafodion/dcs/Constants.java
index 79cd8a6..63a59c5 100644
--- a/dcs/src/main/java/org/trafodion/dcs/Constants.java
+++ b/dcs/src/main/java/org/trafodion/dcs/Constants.java
@@ -201,6 +201,12 @@ public final class Constants {
     /** Default value for user program restart handler retry interval millis */
     public static final int 
DEFAULT_DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_INTERVAL_MILLIS = 5000;
 
+    /** Configuration key for user program restart handler retry timeout 
minutes */
+    public static final String 
DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_TIMEOUT_MINUTES = 
"dcs.server.user.program.restart.handler.retry.timeout.minutes";
+
+    /** Default value for user program restart handler retry timeout minutes */
+    public static final int 
DEFAULT_DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_TIMEOUT_MINUTES = 30;
+
     /** Name of ZooKeeper quorum configuration parameter. */
     public static final String ZOOKEEPER_QUORUM = "dcs.zookeeper.quorum";
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/abfd6d61/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java
----------------------------------------------------------------------
diff --git a/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java 
b/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java
index 3f41f79..81e74df 100644
--- a/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java
+++ b/dcs/src/main/java/org/trafodion/dcs/server/ServerManager.java
@@ -22,7 +22,6 @@
 **********************************************************************/
 package org.trafodion.dcs.server;
 
-import java.net.InetAddress;
 import java.io.*;
 import java.util.*;
 import java.util.concurrent.Callable;
@@ -31,9 +30,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.CountDownLatch;
-import java.text.DateFormat;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +44,6 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.KeeperException;
 import org.trafodion.dcs.Constants;
 import org.trafodion.dcs.util.Bytes;
-import org.trafodion.dcs.util.DcsConfiguration;
 import org.trafodion.dcs.util.DcsNetworkConfiguration;
 import org.trafodion.dcs.util.RetryCounter;
 import org.trafodion.dcs.util.RetryCounterFactory;
@@ -56,32 +53,32 @@ import org.trafodion.dcs.script.ScriptContext;
 
 public final class ServerManager implements Callable {
     private static final Log LOG = LogFactory.getLog(ServerManager.class);
-    private static Configuration conf;
-    private static ZkClient zkc;
-    private static boolean userProgEnabled;
-    private static String userProgramHome;
-    private static String userProgramCommand;
-    private static String hostName;
-    private static String masterHostName;
-    private static long masterStartTime;
-    private static int port;
-    private static int portRange;
-    private static DcsNetworkConfiguration netConf;
-    private static int instance;
-    private static int childServers;
-    private static String parentZnode;
-    private static int connectingTimeout;
-    private static int zkSessionTimeout;
-    private static int userProgExitAfterDisconnect;
-    private static int infoPort;
-    private static int maxHeapPctExit;
-    private static int statisticsIntervalTime;
-    private static int statisticsLimitTime;
-    private static String statisticsType;
-    private static String statisticsEnable;
-    private static String sqlplanEnable;
-    private static int userProgPortMapToSecs;
-    private static int userProgPortBindToSecs;
+    private Configuration conf;
+    private ZkClient zkc;
+    private boolean userProgEnabled;
+    private String userProgramHome;
+    private String userProgramCommand;
+    private String hostName;
+    private String masterHostName;
+    private long masterStartTime;
+    private int port;
+    private int portRange;
+    private DcsNetworkConfiguration netConf;
+    private int instance;
+    private int childServers;
+    private String parentZnode;
+    private int connectingTimeout;
+    private int zkSessionTimeout;
+    private int userProgExitAfterDisconnect;
+    private int infoPort;
+    private int maxHeapPctExit;
+    private int statisticsIntervalTime;
+    private int statisticsLimitTime;
+    private String statisticsType;
+    private String statisticsEnable;
+    private String sqlplanEnable;
+    private int userProgPortMapToSecs;
+    private int userProgPortBindToSecs;
     private ServerHandler[] serverHandlers;
     private int maxRestartAttempts;
     private int retryIntervalMillis;
@@ -97,7 +94,9 @@ public final class ServerManager implements Callable {
         public void process(WatchedEvent event) {
             if (event.getType() == Event.EventType.NodeDeleted) {
                 String znodePath = event.getPath();
-                LOG.debug("Registered znode deleted [" + znodePath + "]");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Registered znode deleted [" + znodePath + "]");
+                }
                 try {
                     startSignal.countDown();
                 } catch (Exception e) {
@@ -124,12 +123,17 @@ public final class ServerManager implements Callable {
 
         public boolean monitor() {
             try {
-                LOG.debug("registered path [" + registeredPath + "]");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("registered path [" + registeredPath + "]");
+                }
                 stat = zkc.exists(registeredPath, false);
                 if (stat != null) { // User program znode found in
                                     // /registered...check pid
                     isRunning = isPidRunning();
-                    LOG.debug("isRunning [" + isRunning + "]");
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("[" + (Integer.parseInt(nid) + 1) + ":" + 
childInstance + "]." + pid + ".isRunning ["
+                                + isRunning + "]");
+                    }
                 }
             } catch (Exception e) {
                 LOG.warn(e.getMessage(), e);
@@ -208,12 +212,12 @@ public final class ServerManager implements Callable {
 
         public void exec() throws Exception {
             cleanupZk();
-            LOG.info("User program exec [" + scriptContext.getCommand() + "]");
+            LOG.info("[" + childInstance + "]. User program exec [" + 
scriptContext.getCommand() + "]");
             ScriptManager.getInstance().runScript(scriptContext);// This will
                                                                  // block while
                                                                  // user prog 
is
                                                                  // running
-            LOG.info("User program exit [" + scriptContext.getExitCode() + 
"]");
+            LOG.info("[" + childInstance + "]. User program exit [" + 
scriptContext.getExitCode() + "]");
             StringBuilder sb = new StringBuilder();
             sb.append("exit code [" + scriptContext.getExitCode() + "]");
             if (!scriptContext.getStdOut().toString().isEmpty())
@@ -240,9 +244,9 @@ public final class ServerManager implements Callable {
                 Stat stat = zkc.exists(registeredPath, false);
                 if (stat != null)
                     zkc.delete(registeredPath, -1);
-            } catch (Exception e) {
+            } catch (KeeperException | InterruptedException e) {
                 e.printStackTrace();
-                LOG.debug(e);
+                LOG.error(e.getMessage(), e);
             }
         }
     }
@@ -255,33 +259,17 @@ public final class ServerManager implements Callable {
         CountDownLatch startSignal = new CountDownLatch(1);
         RetryCounter retryCounter;
 
-        public void reset() {
-            startSignal.countDown();
-            startSignal = new CountDownLatch(1);
-            boolean isRunning = this.serverMonitor.monitor();
-            String nid = this.serverMonitor.nid;
-            String pid = this.serverMonitor.pid;
-
-            if (isRunning) {
-                LOG.info("mxosrvr " + nid + "," + pid + " still running");
-                this.retryCounter.resetAttemptTimes();
-            } else {
-                LOG.info("mxosrvr " + nid + "," + pid + " exited, restarting, 
restart attempt time : "
-                        + this.retryCounter.getAttemptTimes());
-            }
-        }
-
         public ServerHandler(Configuration conf ,int childInstance) {
             int maxRestartAttempts = 
conf.getInt(Constants.DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_ATTEMPTS,
                     
Constants.DEFAULT_DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_ATTEMPTS);
-            int retryIntervalMillis = conf.getInt(
-                    
Constants.DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_INTERVAL_MILLIS,
-                    
Constants.DEFAULT_DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_INTERVAL_MILLIS);
+            int retryTimeoutMinutes = conf.getInt(
+                    
Constants.DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_TIMEOUT_MINUTES,
+                    
Constants.DEFAULT_DCS_SERVER_USER_PROGRAM_RESTART_HANDLER_RETRY_TIMEOUT_MINUTES);
             this.childInstance = childInstance;
             this.registeredPath = parentZnode
                     + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_REGISTERED
                     + "/" + hostName + ":" + instance + ":" + childInstance;
-            retryCounter = RetryCounterFactory.create(maxRestartAttempts, 
retryIntervalMillis);
+            retryCounter = RetryCounterFactory.create(maxRestartAttempts, 
retryTimeoutMinutes, TimeUnit.MINUTES);
             serverMonitor = new ServerMonitor(childInstance, registeredPath);
             serverRunner = new ServerRunner(childInstance, registeredPath);
         }
@@ -294,7 +282,9 @@ public final class ServerManager implements Callable {
                 LOG.info("Server handler [" + instance + ":" + childInstance
                         + "] is running");
                 zkc.exists(registeredPath, new RegisteredWatcher(startSignal));
-                LOG.debug("Waiting for start signal");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Waiting for start signal");
+                }
                 startSignal.await();
                 serverRunner.exec();
             } else {
@@ -361,7 +351,7 @@ public final class ServerManager implements Callable {
         serverHandlers = new ServerHandler[this.childServers];
     }
 
-    private static boolean isTrafodionRunning(String nid) {
+    private boolean isTrafodionRunning(String nid) {
 
         // Check if the given Node is up and running
         // return true else return false.
@@ -425,19 +415,23 @@ public final class ServerManager implements Callable {
             for (int childInstance = 1; childInstance <= childServers; 
childInstance++) {
                 serverHandlers[childInstance-1] = new ServerHandler(conf, 
childInstance);
                 completionService.submit(serverHandlers[childInstance-1]);
-                LOG.debug("Started server handler [" + instance + ":"
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Started server handler [" + instance + ":"
                         + childInstance + "]");
+                }
             }
 
             while (true) {
-                LOG.debug("Waiting for any server handler to finish");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Waiting for any server handler to finish");
+                }
                 Future<Integer> f = completionService.take();// blocks waiting
                                                              // for any
                                                              // ServerHandler 
to
                                                              // finish
                 if (f != null) {
                     Integer result = f.get();
-                    LOG.debug("Server handler [" + instance + ":" + result + 
"] finished");
+                    LOG.info("Server handler [" + instance + ":" + result + "] 
exit");
 
                     retryCounter = 
RetryCounterFactory.create(maxRestartAttempts, retryIntervalMillis);
                     while (!isTrafodionRunning(nid)) {
@@ -451,9 +445,7 @@ public final class ServerManager implements Callable {
                     int childInstance = result.intValue();
                     // get the node id
                     ServerHandler previousServerHandler = 
serverHandlers[childInstance - 1];
-                    previousServerHandler.reset();
-                    if (previousServerHandler.retryCounter.shouldRetry()) {
-                        previousServerHandler.retryCounter.useRetry();
+                    if 
(previousServerHandler.retryCounter.shouldRetryInnerMinutes()) {
                         serverHandlers[childInstance - 1] = 
previousServerHandler;
                         completionService.submit(serverHandlers[childInstance 
- 1]);
                     } else {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/abfd6d61/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java
----------------------------------------------------------------------
diff --git a/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java 
b/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java
index b1c44b1..1300c72 100644
--- a/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java
+++ b/dcs/src/main/java/org/trafodion/dcs/util/RetryCounter.java
@@ -20,54 +20,94 @@
  */
 package org.trafodion.dcs.util;
 
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class RetryCounter {
-  private static final Log LOG = LogFactory.getLog(RetryCounter.class);
-  private final int maxRetries;
-  private int retriesRemaining;
-  private final int retryIntervalMillis;
-  private final TimeUnit timeUnit;
+    private static final Log LOG = LogFactory.getLog(RetryCounter.class);
+    private final int maxRetries;
+    private int retriesRemaining;
+    private int retryInterval;
+    private Queue<Long> queue;
+    private TimeUnit timeUnit;
 
-  public RetryCounter(int maxRetries, 
-  int retryIntervalMillis, TimeUnit timeUnit) {
-    this.maxRetries = maxRetries;
-    this.retriesRemaining = maxRetries;
-    this.retryIntervalMillis = retryIntervalMillis;
-    this.timeUnit = timeUnit;
-  }
+    public RetryCounter(int maxRetries, int retryInterval, TimeUnit timeUnit) {
+        this.maxRetries = maxRetries;
+        this.retriesRemaining = maxRetries;
+        this.retryInterval = retryInterval;
+        this.queue = new LinkedList<Long>();
+        this.timeUnit = timeUnit;
+    }
 
-  public int getMaxRetries() {
-    return maxRetries;
-  }
+    public int getMaxRetries() {
+        return maxRetries;
+    }
 
-  /**
-   * Sleep for a exponentially back off time
-   * @throws InterruptedException
-   */
-  public void sleepUntilNextRetry() throws InterruptedException {
-    int attempts = getAttemptTimes();
-    long sleepTime = (long) (retryIntervalMillis * Math.log(attempts+15));
-    LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + "...");
-    timeUnit.sleep(sleepTime);
-  }
+    /**
+     * Sleep for a exponentially back off time
+     * @throws InterruptedException
+     */
+    public void sleepUntilNextRetry() throws InterruptedException {
+        int attempts = getAttemptTimes();
+        long sleepTime = (long) (retryInterval * Math.log(attempts + 15));
+        LOG.info("Sleeping " + sleepTime + "ms before retry #" + attempts + 
"...");
+        timeUnit.sleep(sleepTime);
+    }
 
-  public boolean shouldRetry() {
-    return retriesRemaining > 0;
-  }
+    public boolean shouldRetry() {
+        return retriesRemaining > 0;
+    }
 
-  public void useRetry() {
-    retriesRemaining--;
-  }
+    public void useRetry() {
+        retriesRemaining--;
+    }
 
-  public int getAttemptTimes() {
-    return maxRetries-retriesRemaining+1;
-  }
+    public int getAttemptTimes() {
+        return maxRetries - retriesRemaining + 1;
+    }
 
-  public void resetAttemptTimes() {
-      this.retriesRemaining = maxRetries;
-  }
+    public void resetAttemptTimes() {
+        this.retriesRemaining = maxRetries;
+    }
+
+    //this retry is in minutes level
+    public boolean shouldRetryInnerMinutes() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("retryInterval = [" + retryInterval + "]. queue size = 
[" + queue.size() + "]. max retries = ["
+                    + getMaxRetries() + "]. ");
+        }
+        if (retryInterval == 0) {
+            return true;
+        }
+        if (queue.size() < getMaxRetries()) {
+            queue.offer(System.currentTimeMillis());
+            return true;
+        } else {
+            long currentTime = System.currentTimeMillis();
+            Long firstRetryTime = queue.peek();
+            long delta = calcDelta(currentTime - firstRetryTime);
+            if (delta < 0) {
+                LOG.error("reject!!! attempt to restart mxosrvr in [ "
+                        + TimeUnit.MILLISECONDS.toMinutes(currentTime - 
firstRetryTime)
+                        + " ] minutes...can't restart mxosrvr large than [ " + 
getMaxRetries() + " ] times in [ "
+                        + this.retryInterval + " ] minutes");
+                return false;
+            }
+            queue.poll();
+            queue.offer(currentTime);
+            return true;
+        }
+    }
+
+    //give the real millisecond as parameter. use this parameter minus given 
retryInterval.
+    private long calcDelta(long realMillis) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("realMillis = [" + realMillis + "]. retryInterval = [" + 
timeUnit.toMillis(retryInterval) + "]");
+        }
+        return realMillis - timeUnit.toMillis(retryInterval);
+    }
 }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/abfd6d61/dcs/src/main/java/org/trafodion/dcs/util/RetryCounterFactory.java
----------------------------------------------------------------------
diff --git a/dcs/src/main/java/org/trafodion/dcs/util/RetryCounterFactory.java 
b/dcs/src/main/java/org/trafodion/dcs/util/RetryCounterFactory.java
index 6c50552..761be4f 100644
--- a/dcs/src/main/java/org/trafodion/dcs/util/RetryCounterFactory.java
+++ b/dcs/src/main/java/org/trafodion/dcs/util/RetryCounterFactory.java
@@ -38,4 +38,8 @@ public class RetryCounterFactory {
     public static RetryCounter create(int maxRetries, int retryIntervalMillis) 
{
         return new RetryCounter(maxRetries, retryIntervalMillis, 
TimeUnit.MILLISECONDS);
     }
+
+    public static RetryCounter create(int maxRetries, int retryInterval, 
TimeUnit timeUnit) {
+        return new RetryCounter(maxRetries, retryInterval, timeUnit);
+    }
 }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/abfd6d61/dcs/src/main/resources/dcs-default.xml
----------------------------------------------------------------------
diff --git a/dcs/src/main/resources/dcs-default.xml 
b/dcs/src/main/resources/dcs-default.xml
index c8e4590..ded30cf 100644
--- a/dcs/src/main/resources/dcs-default.xml
+++ b/dcs/src/main/resources/dcs-default.xml
@@ -372,4 +372,11 @@
         The classname of the DcsServer to start. Used for development of 
multithreaded server 
     </description>
   </property>
+  <property>
+    <name>dcs.server.user.program.restart.handler.retry.timeout.minutes</name>
+    <value>30</value>
+    <description>
+        Timeout minutes between first and max server restart times.
+    </description>
+  </property>
 </configuration>

Reply via email to