[FLINK-2641] integrate off-heap memory configuration - add offheap configuration parameter taskmanager.memory.off-heap - remove offheap ratio parameter and reuse memory fraction parameter - set JVM -XX:MaxDirectMemorySize parameter correctly
This closes #1129 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/76a40d59 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/76a40d59 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/76a40d59 Branch: refs/heads/master Commit: 76a40d59e6623cfbc6e265d26da4e739e5e7ed18 Parents: a3150a3 Author: Maximilian Michels <m...@apache.org> Authored: Mon Sep 14 15:07:23 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Sep 16 16:16:03 2015 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 10 --- flink-dist/src/main/flink-bin/bin/config.sh | 51 +++++++++++++-- flink-dist/src/main/flink-bin/bin/jobmanager.sh | 10 +-- .../src/main/flink-bin/bin/taskmanager.sh | 38 ++++++++++-- .../io/network/buffer/NetworkBufferPool.java | 9 +-- .../flink/runtime/taskmanager/TaskManager.scala | 65 +++++++++++++------- 6 files changed, 132 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index bbaf71a..cd7fd76 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -125,11 +125,6 @@ public final class ConfigConstants { public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction"; /** - * The fraction of off-heap memory relative to the heap size. - */ - public static final String TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY = "taskmanager.memory.off-heap-ratio"; - - /** * The config parameter defining the memory allocation method (JVM heap or off-heap). */ public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap"; @@ -546,11 +541,6 @@ public final class ConfigConstants { * The default fraction of the free memory allocated by the task manager's memory manager. */ public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f; - - /** - * The default ratio of heap to off-heap memory, when the TaskManager is started with off-heap memory. - */ - public static final float DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO = 3.0f; /** * Default number of buffers used in the network stack. http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 2aa9c78..f4f58f2 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -87,8 +87,17 @@ DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters ru # CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml ######################################################################################################################## -KEY_JOBM_HEAP_MB="jobmanager.heap.mb" -KEY_TASKM_HEAP_MB="taskmanager.heap.mb" +KEY_JOBM_MEM_SIZE="jobmanager.heap.mb" +KEY_TASKM_MEM_SIZE="taskmanager.heap.mb" +KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size" +KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction" +KEY_TASKM_MEM_NETWORK_BUFFERS="taskmanager.network.numberOfBuffers" +# BEGIN:deprecated +KEY_TASKM_MEM_NETWORK_BUFFER_SIZE="taskmanager.network.bufferSizeInBytes" +# END:deprecated +KEY_TASKM_MEM_SEGMENT_SIZE="taskmanager.memory.segment-size" +KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap" + KEY_ENV_PID_DIR="env.pid.dir" KEY_ENV_LOG_MAX="env.log.max" KEY_ENV_JAVA_HOME="env.java.home" @@ -132,7 +141,8 @@ FLINK_ROOT_DIR_MANGLED=`manglePath "$FLINK_ROOT_DIR"` if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_ROOT_DIR_MANGLED/conf; fi FLINK_BIN_DIR=$FLINK_ROOT_DIR_MANGLED/bin FLINK_LOG_DIR=$FLINK_ROOT_DIR_MANGLED/log -YAML_CONF=${FLINK_CONF_DIR}/flink-conf.yaml +FLINK_CONF_FILE="flink-conf.yaml" +YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} ######################################################################################################################## # ENVIRONMENT VARIABLES @@ -173,12 +183,37 @@ IS_NUMBER="^[0-9]+$" # Define FLINK_JM_HEAP if it is not already set if [ -z "${FLINK_JM_HEAP}" ]; then - FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_HEAP_MB} 0 "${YAML_CONF}") + FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}") fi # Define FLINK_TM_HEAP if it is not already set if [ -z "${FLINK_TM_HEAP}" ]; then - FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_HEAP_MB} 0 "${YAML_CONF}") + FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then + FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set +if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then + FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}") +fi + +# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set +if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then + BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}") + if [ "${BUFFER_SIZE}" -eq "0" ]; then + BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFER_SIZE} "$((32 * 1024))" "${YAML_CONF}") + fi + NUM_BUFFERS=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFERS} "2048" "${YAML_CONF}") + FLINK_TM_MEM_NETWORK_SIZE=$((((NUM_BUFFERS * BUFFER_SIZE) >> 20) + 1)) +fi + +# Define FLINK_TM_OFFHEAP if it is not already set +if [ -z "${FLINK_TM_OFFHEAP}" ]; then + FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} 0 "${YAML_CONF}") fi if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then @@ -211,7 +246,7 @@ fi # Arguments for the JVM. Used for job and task manager JVMs. # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys -# KEY_JOBM_HEAP_MB and KEY_TASKM_HEAP_MB for that! +# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that! if [ -z "${JVM_ARGS}" ]; then JVM_ARGS="" fi @@ -308,3 +343,7 @@ readSlaves() { fi done < "$SLAVES_FILE" } + +useOffHeapMemory() { + [[ "`echo ${FLINK_TM_OFFHEAP} | tr '[:upper:]' '[:lower:]'`" == "true" ]] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/jobmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh index c18a909..45b8e79 100755 --- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh @@ -43,21 +43,21 @@ if [[ $STARTSTOP == "start" ]]; then STREAMINGMODE="batch" fi - if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then - echo "[ERROR] Configured JobManager JVM heap size is not a number. Please set '$KEY_JOBM_HEAP_MB' in $FLINK_CONF_FILE." + if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP}" -lt "0" ]]; then + echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi if [ "$EXECUTIONMODE" = "local" ]; then - if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then - echo "[ERROR] Configured JobManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." + if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then + echo "[ERROR] Configured TaskManager memory size is not a valid value. Please set ${KEY_TASKM_MEM_SIZE} in ${FLINK_CONF_FILE}." exit 1 fi FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP` fi - if [ "$FLINK_JM_HEAP" -gt 0 ]; then + if [ "${FLINK_JM_HEAP}" -gt "0" ]; then export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m" fi http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-dist/src/main/flink-bin/bin/taskmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index c41270d..f5aecc6 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -51,13 +51,43 @@ if [[ $STARTSTOP == "start" ]]; then fi fi - if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then - echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE." + if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then + echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi - if [ "$FLINK_TM_HEAP" -gt 0 ]; then - export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m" + if [ "${FLINK_TM_HEAP}" -gt "0" ]; then + + TM_HEAP_SIZE=${FLINK_TM_HEAP} + TM_OFFHEAP_SIZE=0 + # some space for Netty initialization + NETTY_BUFFERS=1 + + if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then + if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then + # We split up the total memory in heap and off-heap memory + if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then + echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')." + exit 1 + fi + TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE} + TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE)) + else + # We calculate the memory using a fraction of the total memory + if [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} >= 1.0"` != "0" ]] || [[ `bc -l <<< "${FLINK_TM_MEM_MANAGED_FRACTION} <= 0.0"` != "0" ]]; then + echo "[ERROR] Configured TaskManager managed memory fraction is not a valid value. Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}" + exit 1 + fi + # recalculate the JVM heap memory by taking the off-heap ratio into account + TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")` + TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE)) + fi + fi + + TM_HEAP_SIZE=$((TM_HEAP_SIZE - FLINK_TM_MEM_NETWORK_SIZE - NETTY_BUFFERS)) + echo export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M" + export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M" + fi # Startup parameters http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 209d925..641d13e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -196,10 +196,11 @@ public class NetworkBufferPool implements BufferPoolFactory { throw new IOException(String.format("Insufficient number of network buffers: " + "required %d, but only %d available. The total number of network " + "buffers is currently set to %d. You can increase this " + - "number by setting the configuration key '" + - ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY + "'.", - numRequiredBuffers, totalNumberOfMemorySegments - numTotalRequiredBuffers, - totalNumberOfMemorySegments)); + "number by setting the configuration key '%s'.", + numRequiredBuffers, + totalNumberOfMemorySegments - numTotalRequiredBuffers, + totalNumberOfMemorySegments, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)); } this.numTotalRequiredBuffers += numRequiredBuffers; http://git-wip-us.apache.org/repos/asf/flink/blob/76a40d59/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index f11b933..1563a7a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1578,7 +1578,7 @@ object TaskManager { LOG.info(s"Using $configuredMemory MB for Flink managed memory.") configuredMemory << 20 // megabytes to bytes } - else if (memType == MemoryType.HEAP) { + else { val fraction = configuration.getFloat( ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) @@ -1586,32 +1586,53 @@ object TaskManager { ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") - val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * - fraction).toLong + if (memType == MemoryType.HEAP) { + val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * + fraction).toLong - LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + - s" heap memory (${relativeMemSize >> 20} MB).") + LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + + s" heap memory (${relativeMemSize >> 20} MB).") - relativeMemSize - } - else { - val ratio = configuration.getFloat( - ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_OFF_HEAP_RATIO) - - checkConfigParameter(ratio > 0.0f, - ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_RATIO_KEY, - "MemoryManager ratio (off-heap memory / heap size) must be larger than zero") - - val maxHeapSize = EnvironmentInformation.getMaxJvmHeapMemory() - val relativeMemSize = (maxHeapSize * ratio).toLong + relativeMemSize + } + else if (memType == MemoryType.OFF_HEAP) { + + val networkBufferSizeNew = configuration.getLong( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) + + val networkBufferSizeOld = configuration.getLong( + ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1) + val networkBufferSize = + if (networkBufferSizeNew != -1) { + networkBufferSizeNew + } else if (networkBufferSizeOld == -1) { + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE + } else { + networkBufferSizeOld + } + + val numNetworkBuffers = configuration.getLong( + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) + + // direct memory for Netty's off-heap buffers + val networkMemory = (numNetworkBuffers * networkBufferSize) + (1 << 20) - LOG.info(s"Using $ratio time the heap size (${maxHeapSize} bytes) for Flink " + - s"managed off-heap memory (${relativeMemSize >> 20} MB).") + // The maximum heap memory has been adjusted according to the fraction + val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() + networkMemory + val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong - relativeMemSize + LOG.info(s"Using $fraction of the maximum memory size for " + + s"Flink managed off-heap memory (${directMemorySize >> 20} MB).") + + directMemorySize + } + else { + throw new RuntimeException("No supported memory type detected.") + } } - + val preAllocateMemory: Boolean = streamingMode == StreamingMode.BATCH_ONLY // now start the memory manager