Repository: helix
Updated Branches:
  refs/heads/master b8355b9a6 -> fa7cabf7e


Make Task limitation configurable and refactor the system property keys

Current limitation is 10K, which is too restrict for user submitting tasks. In 
this rb, we reset to 100K as the default and allow user setting system property 
for configuring this value.

RB=1271027
BUG=HELIX-897
G=helix-reviewers
A=hrzhang


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/fa7cabf7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/fa7cabf7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/fa7cabf7

Branch: refs/heads/master
Commit: fa7cabf7ea12a4046b824a674294887bae82a99b
Parents: a8af64b
Author: Junkai Xue <[email protected]>
Authored: Tue Apr 3 17:04:43 2018 -0700
Committer: Junkai Xue <[email protected]>
Committed: Thu Apr 19 17:40:30 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/SystemPropertyKeys.java    | 30 ++++++++++++
 .../helix/manager/zk/CallbackHandler.java       |  9 ++--
 .../apache/helix/manager/zk/ZKHelixManager.java | 49 +++++++++-----------
 .../java/org/apache/helix/task/TaskDriver.java  |  7 ++-
 .../java/org/apache/helix/util/HelixUtil.java   | 45 ++++++++++++++++++
 5 files changed, 106 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/fa7cabf7/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java 
b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
new file mode 100644
index 0000000..2277258
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -0,0 +1,30 @@
+package org.apache.helix;
+
+public class SystemPropertyKeys {
+  // Task Driver
+  public static final String TASK_CONFIG_LIMITATION = 
"helixTask.configsLimitation";
+
+  // ZKHelixManager
+  public static final String CLUSTER_MANAGER_VERSION = 
"cluster-manager-version.properties";
+
+  public static final String FLAPPING_TIME_WINDOW = 
"helixmanager.flappingTimeWindow";
+
+  public static final String MAX_DISCONNECT_THRESHOLD = 
"helixmanager.maxDisconnectThreshold";
+
+  public static final String ZK_SESSION_TIMEOUT = "zk.session.timeout";
+
+  public static final String ZK_CONNECTION_TIMEOUT = "zk.connection.timeout";
+
+  public static final String ZK_REESTABLISHMENT_CONNECTION_TIMEOUT =
+      "zk.connectionReEstablishment.timeout";
+
+  public static final String ZK_WAIT_CONNECTED_TIMEOUT = 
"helixmanager.waitForConnectedTimeout";
+
+  public static final String PARTICIPANT_HEALTH_REPORT_LATENCY =
+      "helixmanager.participantHealthReport.reportLatency";
+
+  // CallbackHandler
+  public static final String ASYNC_BATCH_MODE_ENABLED = 
"helix.callbackhandler.isAsyncBatchModeEnabled";
+
+  public static final String LEGACY_ASYNC_BATCH_MODE_ENABLED = 
"isAsyncBatchModeEnabled";
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/fa7cabf7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 5890fb8..12f7d0f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -37,6 +37,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
@@ -222,11 +223,11 @@ public class CallbackHandler implements IZkChildListener, 
IZkDataListener {
     BatchMode batchMode = _listener.getClass().getAnnotation(BatchMode.class);
     PreFetch preFetch = _listener.getClass().getAnnotation(PreFetch.class);
 
-    String asyncBatchModeEnabled =
-        System.getProperty("helix.callbackhandler.isAsyncBatchModeEnabled");
+    String asyncBatchModeEnabled = 
System.getProperty(SystemPropertyKeys.ASYNC_BATCH_MODE_ENABLED);
     if (asyncBatchModeEnabled == null) {
-      // for back-compatible, the old property name is deprecated.
-      asyncBatchModeEnabled = System.getProperty("isAsyncBatchModeEnabled");
+      // for backcompatible, the old property name is deprecated.
+      asyncBatchModeEnabled =
+          
System.getProperty(SystemPropertyKeys.LEGACY_ASYNC_BATCH_MODE_ENABLED);
     }
 
     if (asyncBatchModeEnabled != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/fa7cabf7/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 63a60c6..4eb037a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -48,6 +48,7 @@ import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.AutoFallbackPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.util.HelixUtil;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper.States;
@@ -196,7 +197,7 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     _instanceName = instanceName;
     _preConnectCallbacks = new ArrayList<>();
     _handlers = new ArrayList<>();
-    _properties = new 
HelixManagerProperties("cluster-manager-version.properties");
+    _properties = new 
HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION);
     _version = _properties.getVersion();
 
     _keyBuilder = new Builder(clusterName);
@@ -218,23 +219,31 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     /**
      * use system property if available
      */
-    _flappingTimeWindowMs = 
getSystemPropertyAsInt("helixmanager.flappingTimeWindow", 
ZKHelixManager.FLAPPING_TIME_WINDOW);
+    _flappingTimeWindowMs = 
HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
+        ZKHelixManager.FLAPPING_TIME_WINDOW);
 
-    _maxDisconnectThreshold =
-        getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold", 
ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
+    _maxDisconnectThreshold = HelixUtil
+        .getSystemPropertyAsInt(SystemPropertyKeys.MAX_DISCONNECT_THRESHOLD,
+            ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
 
-    _sessionTimeout = getSystemPropertyAsInt("zk.session.timeout", 
ZkClient.DEFAULT_SESSION_TIMEOUT);
+    _sessionTimeout = 
HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT,
+        ZkClient.DEFAULT_SESSION_TIMEOUT);
 
-    _connectionInitTimeout = getSystemPropertyAsInt("zk.connection.timeout", 
ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    _connectionInitTimeout = HelixUtil
+        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
+            ZkClient.DEFAULT_CONNECTION_TIMEOUT);
 
-    _connectionRetryTimeout = 
getSystemPropertyAsInt("zk.connectionReEstablishment.timeout",
-        DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
+    _connectionRetryTimeout = HelixUtil
+        
.getSystemPropertyAsInt(SystemPropertyKeys.ZK_REESTABLISHMENT_CONNECTION_TIMEOUT,
+            DEFAULT_CONNECTION_ESTABLISHMENT_RETRY_TIMEOUT);
 
-    _waitForConnectedTimeout = 
getSystemPropertyAsInt("helixmanager.waitForConnectedTimeout",
-        DEFAULT_WAIT_CONNECTED_TIMEOUT);
+    _waitForConnectedTimeout = HelixUtil
+        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
+            DEFAULT_WAIT_CONNECTED_TIMEOUT);
 
-    _reportLatency = 
getSystemPropertyAsInt("helixmanager.participantHealthReport.reportLatency",
-        ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY);
+    _reportLatency = HelixUtil
+        
.getSystemPropertyAsInt(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY,
+            ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY);
 
     /**
      * instance type specific init
@@ -272,22 +281,6 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     }
   }
 
-  private int getSystemPropertyAsInt(String propertyKey, int 
propertyDefaultValue) {
-    String valueString = System.getProperty(propertyKey, "" + 
propertyDefaultValue);
-
-    try {
-      int value = Integer.parseInt(valueString);
-      if (value > 0) {
-        return value;
-      }
-    } catch (NumberFormatException e) {
-      LOG.warn("Exception while parsing property: " + propertyKey + ", string: 
" + valueString
-          + ", using default value: " + propertyDefaultValue);
-    }
-
-    return propertyDefaultValue;
-  }
-
   @Override public boolean removeListener(PropertyKey key, Object listener) {
     LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + 
" from cluster: "
         + _clusterName + " by instance: " + _instanceName);

http://git-wip-us.apache.org/repos/asf/helix/blob/fa7cabf7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 4a7707a..a628dd3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -31,6 +31,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ public class TaskDriver {
   /** For logging */
   private static final Logger LOG = LoggerFactory.getLogger(TaskDriver.class);
 
+
   /** Default time out for monitoring workflow or job state */
   private final static int _defaultTimeout = 3 * 60 * 1000; /* 3 mins */
 
@@ -65,10 +67,11 @@ public class TaskDriver {
   // return empty list.
   //
   // TODO Implement or configure the limitation in ZK server.
-  private final static int DEFAULT_CONFIGS_LIMITATION = 10000;
+  private final static long DEFAULT_CONFIGS_LIMITATION =
+      
HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.TASK_CONFIG_LIMITATION, 
100000L);
   private final static long TIMESTAMP_NOT_SET = -1L;
   private final static String TASK_START_TIME_KEY = "START_TIME";
-  protected int _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
+  protected long _configsLimitation = DEFAULT_CONFIGS_LIMITATION;
 
   private final HelixDataAccessor _accessor;
   private final HelixPropertyStore<ZNRecord> _propertyStore;

http://git-wip-us.apache.org/repos/asf/helix/blob/fa7cabf7/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index d0be481..7ae3630 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -238,4 +238,49 @@ public final class HelixUtil {
     }
     return false;
   }
+
+  /**
+   * Get the value of system property
+   * @param propertyKey
+   * @param propertyDefaultValue
+   * @return
+   */
+  public static int getSystemPropertyAsInt(String propertyKey, int 
propertyDefaultValue) {
+    String valueString = System.getProperty(propertyKey, "" + 
propertyDefaultValue);
+
+    try {
+      int value = Integer.parseInt(valueString);
+      if (value > 0) {
+        return value;
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("Exception while parsing property: " + propertyKey + ", string: 
" + valueString
+          + ", using default value: " + propertyDefaultValue);
+    }
+
+    return propertyDefaultValue;
+  }
+
+  /**
+   * Get the value of system property
+   * @param propertyKey
+   * @param propertyDefaultValue
+   * @return
+   */
+  public static long getSystemPropertyAsLong(String propertyKey, long 
propertyDefaultValue) {
+    String valueString = System.getProperty(propertyKey, "" + 
propertyDefaultValue);
+
+    try {
+      long value = Long.parseLong(valueString);
+      if (value > 0) {
+        return value;
+      }
+    } catch (NumberFormatException e) {
+      LOG.warn("Exception while parsing property: " + propertyKey + ", string: 
" + valueString
+          + ", using default value: " + propertyDefaultValue);
+    }
+
+    return propertyDefaultValue;
+  }
+
 }

Reply via email to