[
https://issues.apache.org/jira/browse/FLINK-4545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15973044#comment-15973044
]
ASF GitHub Bot commented on FLINK-4545:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/3721#discussion_r112002748
--- Diff:
flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dist;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Random;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit test that verifies that the task manager heap size calculation
used by the bash script
+ * <tt>taskmanager.sh</tt> returns the same values as the heap size
calculation of
+ * {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
+ *
+ * NOTE: the shell script uses <tt>awk</tt> to perform floating-point
arithmetic which uses
+ * <tt>double</tt> precision but our Java code restrains to <tt>float</tt>
because we actually do
+ * not need high precision.
+ */
+public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger
{
+
+ /** Key that is used by <tt>config.sh</tt>. */
+ private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+
+ /**
+ * Number of tests with random values.
+ *
+ * NOTE: calling the external test script is slow and thus low numbers
are preferred for general
+ * testing.
+ */
+ private static final int NUM_RANDOM_TESTS = 20;
+
+ @Before
+ public void checkOperatingSystem() {
+ Assume.assumeTrue("This test checks shell scripts not available
on Windows.",
+ !OperatingSystem.isWindows());
+ }
+
+ /**
+ * Tests that {@link TaskManagerServices#calculateNetworkBuf(long,
Configuration)} has the same
+ * result as the shell script.
+ */
+ @Test
+ public void compareNetworkBufShellScriptWithJava() throws Exception {
+ int managedMemSize =
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+ float managedMemFrac =
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
+
+ // manual tests from
org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
+
+ compareNetworkBufJavaVsScript(
+ getConfig(1000, false, 0.1f, 64L << 20, 1L << 30,
managedMemSize, managedMemFrac), 0.0f);
+
+ compareNetworkBufJavaVsScript(
+ getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10
/*MB*/, managedMemFrac), 0.0f);
+
+ compareNetworkBufJavaVsScript(
+ getConfig(1000, true, 0.1f, 64L << 20, 1L << 30,
managedMemSize, 0.1f), 0.0f);
+
+ // some automated tests with random (but valid) values
+
+ Random ran = new Random();
+ for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
+ // tolerate that values differ by 1% (due to different
floating point precisions)
+ compareNetworkBufJavaVsScript(getRandomConfig(ran),
0.01f);
--- End diff --
As with the other randomized tests we should print the configured used for
the test in case of failure.
> Flink automatically manages TM network buffer
> ---------------------------------------------
>
> Key: FLINK-4545
> URL: https://issues.apache.org/jira/browse/FLINK-4545
> Project: Flink
> Issue Type: Wish
> Components: Network
> Reporter: Zhenzhong Xu
>
> Currently, the number of network buffer per task manager is preconfigured and
> the memory is pre-allocated through taskmanager.network.numberOfBuffers
> config. In a Job DAG with shuffle phase, this number can go up very high
> depends on the TM cluster size. The formula for calculating the buffer count
> is documented here
> (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
>
> #slots-per-TM^2 * #TMs * 4
> In a standalone deployment, we may need to control the task manager cluster
> size dynamically and then leverage the up-coming Flink feature to support
> scaling job parallelism/rescaling at runtime.
> If the buffer count config is static at runtime and cannot be changed without
> restarting task manager process, this may add latency and complexity for
> scaling process. I am wondering if there is already any discussion around
> whether the network buffer should be automatically managed by Flink or at
> least expose some API to allow it to be reconfigured. Let me know if there is
> any existing JIRA that I should follow.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)