This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 3896bbe61 [AMORO-3329] Add MemorySize for parsing memory value and
unit (#3409)
3896bbe61 is described below
commit 3896bbe6188533d7357d5e58222d4c914add4de1
Author: Jzjsnow <[email protected]>
AuthorDate: Tue Jan 21 14:08:58 2025 +0800
[AMORO-3329] Add MemorySize for parsing memory value and unit (#3409)
* [AMORO-3329] Add MemorySize for parsing memory value and unit
* fixup! [AMORO-3329] Add MemorySize for parsing memory value and unit
---------
Co-authored-by: jzjsnow <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../server/manager/FlinkOptimizerContainer.java | 31 +-
amoro-ams/src/main/resources/mysql/upgrade.sql | 8 +
amoro-ams/src/main/resources/postgres/upgrade.sql | 36 +-
.../manager/TestFlinkOptimizerContainer.java | 45 ++-
.../java/org/apache/amoro/utils/MemorySize.java | 413 +++++++++++++++++++++
.../org/apache/amoro/utils/MemorySizeTest.java | 245 ++++++++++++
6 files changed, 746 insertions(+), 32 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
index 314f13789..123a9dd12 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
@@ -29,6 +29,7 @@ import
org.apache.amoro.shade.guava32.com.google.common.base.Supplier;
import org.apache.amoro.shade.guava32.com.google.common.base.Suppliers;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.amoro.utils.MemorySize;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
@@ -228,9 +229,11 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
properties, resourceFlinkConf,
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
resourceFlinkConf.putToOptions(
- FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory +
"m");
+ FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
+ MemorySize.ofMebiBytes(jobManagerMemory).toString());
resourceFlinkConf.putToOptions(
- FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, taskManagerMemory +
"m");
+ FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
+ MemorySize.ofMebiBytes(taskManagerMemory).toString());
resourceFlinkConf.putToOptions(
FlinkConfKeys.YARN_APPLICATION_JOB_NAME,
String.join(
@@ -354,28 +357,8 @@ public class FlinkOptimizerContainer extends
AbstractResourceContainer {
if (memoryStr == null || memoryStr.isEmpty()) {
return 0;
}
- memoryStr = memoryStr.toLowerCase().trim().replaceAll("\\s", "");
- Matcher matcher = Pattern.compile("(\\d+)([mg])").matcher(memoryStr);
- if (matcher.matches()) {
- long size = Long.parseLong(matcher.group(1));
- String unit = matcher.group(2);
- switch (unit) {
- case "m":
- return size;
- case "g":
- return size * 1024;
- default:
- LOG.error("Invalid memory size unit: {}, Please use m or g as the
unit", unit);
- return 0;
- }
- } else {
- try {
- return Long.parseLong(memoryStr);
- } catch (NumberFormatException e) {
- LOG.error("Invalid memory size format: {}", memoryStr);
- return 0;
- }
- }
+
+ return MemorySize.parse(memoryStr).getMebiBytes();
}
private <T> T fetchCommandOutput(Process exec, Function<String, T>
commandReader) {
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index f632c01c3..774a9b0b5 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -67,3 +67,11 @@ CREATE TABLE `http_session` (
PRIMARY KEY(`session_id`, `context_path`, `virtual_host`),
KEY `idx_session_expiry` (`expiry_time`)
) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Http session store'
ROW_FORMAT=DYNAMIC;
+
+-- update resource group memory unit
+update resource_group set properties = JSON_SET(properties,
'$."flink-conf.jobmanager.memory.process.size"',
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE
JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$';
+update resource_group set properties = JSON_SET(properties,
'$."flink-conf.taskmanager.memory.process.size"',
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE
JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$';
+
+-- update resource memory unit
+update resource set properties = JSON_SET(properties,
'$."flink-conf.jobmanager.memory.process.size"',
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE
JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$';
+update resource set properties = JSON_SET(properties,
'$."flink-conf.taskmanager.memory.process.size"',
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE
JSON_UNQUOTE(JSON_EXTRACT(properties,
'$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$';
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 225cdbd49..5d4387422 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -84,4 +84,38 @@ COMMENT ON COLUMN http_session.last_save_time IS 'Last save
time';
COMMENT ON COLUMN http_session.expiry_time IS 'Expiry time';
COMMENT ON COLUMN http_session.max_interval IS 'Max internal';
COMMENT ON COLUMN http_session.data_store IS 'Session data store';
-COMMENT ON TABLE http_session IS 'Http session store';
\ No newline at end of file
+COMMENT ON TABLE http_session IS 'Http session store';
+
+ -- update resource group memory unit
+UPDATE resource_group
+SET properties = jsonb_set(
+ properties,
+ '{flink-conf,jobmanager,memory,process,size}',
+ (properties->>'flink-conf.jobmanager.memory.process.size') || 'MB'
+ )
+WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$';
+
+UPDATE resource_group
+SET properties = jsonb_set(
+ properties,
+ '{flink-conf,taskmanager,memory,process,size}',
+ (properties->>'flink-conf.taskmanager.memory.process.size') || 'MB'
+ )
+WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$';
+
+-- update resource memory unit
+UPDATE resource
+SET properties = jsonb_set(
+ properties,
+ '{flink-conf,jobmanager,memory,process,size}',
+ (properties->>'flink-conf.jobmanager.memory.process.size') || 'MB'
+ )
+WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$';
+
+UPDATE resource
+SET properties = jsonb_set(
+ properties,
+ '{flink-conf,taskmanager,memory,process,size}',
+ (properties->>'flink-conf.taskmanager.memory.process.size') || 'MB'
+ )
+WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$';
\ No newline at end of file
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
index 0862968b4..7e0e03d4a 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
@@ -42,17 +42,36 @@ public class TestFlinkOptimizerContainer {
@Test
public void testParseMemorySize() {
- Assert.assertEquals(100, container.parseMemorySize("100"));
+ Assert.assertEquals(0, container.parseMemorySize("100"));
+ Assert.assertEquals(0, container.parseMemorySize("100k"));
+ Assert.assertEquals(0, container.parseMemorySize("100kb"));
+ Assert.assertEquals(0, container.parseMemorySize("100 k"));
+ Assert.assertEquals(0, container.parseMemorySize("100 kb"));
+ Assert.assertEquals(0, container.parseMemorySize("100K"));
+ Assert.assertEquals(0, container.parseMemorySize("100KB"));
+ Assert.assertEquals(0, container.parseMemorySize("100 K"));
+ Assert.assertEquals(0, container.parseMemorySize("100 KB"));
Assert.assertEquals(100, container.parseMemorySize("100m"));
+ Assert.assertEquals(100, container.parseMemorySize("100mb"));
Assert.assertEquals(100, container.parseMemorySize("100 m"));
+ Assert.assertEquals(100, container.parseMemorySize("100 mb"));
Assert.assertEquals(100, container.parseMemorySize("100M"));
+ Assert.assertEquals(100, container.parseMemorySize("100MB"));
Assert.assertEquals(100, container.parseMemorySize("100 M"));
+ Assert.assertEquals(100, container.parseMemorySize("100 MB"));
Assert.assertEquals(102400, container.parseMemorySize("100g"));
+ Assert.assertEquals(102400, container.parseMemorySize("100gb"));
Assert.assertEquals(102400, container.parseMemorySize("100 g"));
+ Assert.assertEquals(102400, container.parseMemorySize("100 gb"));
Assert.assertEquals(102400, container.parseMemorySize("100G"));
+ Assert.assertEquals(102400, container.parseMemorySize("100GB"));
Assert.assertEquals(102400, container.parseMemorySize("100 G"));
- Assert.assertEquals(0, container.parseMemorySize("G100G"));
- Assert.assertEquals(0, container.parseMemorySize("100kb"));
+ Assert.assertEquals(102400, container.parseMemorySize("100 GB"));
+ try {
+ Assert.assertEquals(0, container.parseMemorySize("G100G"));
+ } catch (NumberFormatException e) {
+ Assert.assertEquals("text does not start with a number", e.getMessage());
+ }
}
@Test
@@ -110,11 +129,11 @@ public class TestFlinkOptimizerContainer {
FlinkOptimizerContainer.FlinkConf.buildFor(prop,
Maps.newHashMap()).build();
Assert.assertEquals(
- 100L,
+ 0,
container.getMemorySizeValue(
prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
- 100L,
+ 0,
container.getMemorySizeValue(
prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
@@ -124,7 +143,7 @@ public class TestFlinkOptimizerContainer {
conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop,
containerProperties).build();
prop.clear();
Assert.assertEquals(
- 200L,
+ 0,
container.getMemorySizeValue(
prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
@@ -143,7 +162,7 @@ public class TestFlinkOptimizerContainer {
container.getMemorySizeValue(
prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
Assert.assertEquals(
- 300L,
+ 0,
container.getMemorySizeValue(
prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
@@ -157,5 +176,17 @@ public class TestFlinkOptimizerContainer {
0L,
container.getMemorySizeValue(
prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
+
+
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
"400 MB");
+
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
"400 MB");
+ conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop,
Maps.newHashMap()).build();
+ Assert.assertEquals(
+ 400L,
+ container.getMemorySizeValue(
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
+ Assert.assertEquals(
+ 400L,
+ container.getMemorySizeValue(
+ prop, conf,
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
}
}
diff --git a/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java
b/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java
new file mode 100644
index 000000000..d53053375
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.utils;
+
+import static
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.GIGA_BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.KILO_BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.TERA_BYTES;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different
units.
+ *
+ * <h2>Parsing</h2>
+ *
+ * <p>The size can be parsed from a text expression. If the expression is a
pure number, the value
+ * will be interpreted as bytes.
+ *
+ * <p>This class was originally copied from <a
href="https://github.com/apache/flink">Apache
+ * Flink</a>. For any inquiries regarding the original code, please refer to <a
+ *
href="https://github.com/apache/flink/blob/master/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java">org.apache.flink.configuration.MemorySize</a>.
+ */
+public class MemorySize implements java.io.Serializable,
Comparable<MemorySize> {
+
+ private static final long serialVersionUID = 1L;
+ public static final MemorySize ZERO = new MemorySize(0L);
+
+ public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+ private static final List<MemoryUnit> ORDERED_UNITS =
+ Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES);
+
+ // ------------------------------------------------------------------------
+
+ /** The memory size, in bytes. */
+ private final long bytes;
+
+ /** The memorized value returned by toString(). */
+ private transient String stringified;
+
+ /** The memorized value returned by toHumanReadableString(). */
+ private transient String humanReadableStr;
+
+ /**
+ * Constructs a new MemorySize.
+ *
+ * @param bytes The size, in bytes. Must be zero or larger.
+ */
+ public MemorySize(long bytes) {
+ checkArgument(bytes >= 0, "bytes must be >= 0");
+ this.bytes = bytes;
+ }
+
+ public static MemorySize ofMebiBytes(long mebiBytes) {
+ return new MemorySize(mebiBytes << 20);
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** Gets the memory size in bytes. */
+ public long getBytes() {
+ return bytes;
+ }
+
+ /** Gets the memory size in Kibibytes (= 1024 bytes). */
+ public long getKibiBytes() {
+ return bytes >> 10;
+ }
+
+ /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */
+ public int getMebiBytes() {
+ return (int) (bytes >> 20);
+ }
+
+ /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */
+ public long getGibiBytes() {
+ return bytes >> 30;
+ }
+
+ /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */
+ public long getTebiBytes() {
+ return bytes >> 40;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (bytes ^ (bytes >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this
+ || (obj != null
+ && obj.getClass() == this.getClass()
+ && ((MemorySize) obj).bytes == this.bytes);
+ }
+
+ @Override
+ public String toString() {
+ if (stringified == null) {
+ stringified = formatToString();
+ }
+
+ return stringified;
+ }
+
+ private String formatToString() {
+ MemoryUnit highestIntegerUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0)
+ .boxed()
+ .findFirst()
+ .map(
+ idx -> {
+ if (idx == 0) {
+ return ORDERED_UNITS.get(0);
+ } else {
+ return ORDERED_UNITS.get(idx - 1);
+ }
+ })
+ .orElse(BYTES);
+
+ return String.format(
+ "%d %s", bytes / highestIntegerUnit.getMultiplier(),
highestIntegerUnit.getUnits()[1]);
+ }
+
+ public String toHumanReadableString() {
+ if (humanReadableStr == null) {
+ humanReadableStr = formatToHumanReadableString();
+ }
+
+ return humanReadableStr;
+ }
+
+ private String formatToHumanReadableString() {
+ MemoryUnit highestUnit =
+ IntStream.range(0, ORDERED_UNITS.size())
+ .sequential()
+ .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier())
+ .boxed()
+ .max(Comparator.naturalOrder())
+ .map(ORDERED_UNITS::get)
+ .orElse(BYTES);
+
+ if (highestUnit == BYTES) {
+ return String.format("%d %s", bytes, BYTES.getUnits()[1]);
+ } else {
+ double approximate = 1.0 * bytes / highestUnit.getMultiplier();
+ return String.format(
+ Locale.ROOT, "%.3f%s (%d bytes)", approximate,
highestUnit.getUnits()[1], bytes);
+ }
+ }
+
+ @Override
+ public int compareTo(MemorySize that) {
+ return Long.compare(this.bytes, that.bytes);
+ }
+
+ // ------------------------------------------------------------------------
+ // Calculations
+ // ------------------------------------------------------------------------
+
+ public MemorySize add(MemorySize that) {
+ return new MemorySize(Math.addExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize subtract(MemorySize that) {
+ return new MemorySize(Math.subtractExact(this.bytes, that.bytes));
+ }
+
+ public MemorySize multiply(double multiplier) {
+ checkArgument(multiplier >= 0, "multiplier must be >= 0");
+
+ BigDecimal product =
BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier));
+ if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+ throw new ArithmeticException("long overflow");
+ }
+ return new MemorySize(product.longValue());
+ }
+
+ public MemorySize divide(long by) {
+ checkArgument(by >= 0, "divisor must be >= 0");
+ return new MemorySize(bytes / by);
+ }
+
+ // ------------------------------------------------------------------------
+ // Parsing
+ // ------------------------------------------------------------------------
+
+ /**
+ * Parses the given string as as MemorySize.
+ *
+ * @param text The string to parse
+ * @return The parsed MemorySize
+ * @throws IllegalArgumentException Thrown, if the expression cannot be
parsed.
+ */
+ public static MemorySize parse(String text) throws IllegalArgumentException {
+ return new MemorySize(parseBytes(text));
+ }
+
+ /**
+ * Parses the given string with a default unit.
+ *
+ * @param text The string to parse.
+ * @param defaultUnit specify the default unit.
+ * @return The parsed MemorySize.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be
parsed.
+ */
+ public static MemorySize parse(String text, MemoryUnit defaultUnit)
+ throws IllegalArgumentException {
+ if (!MemoryUnit.hasUnit(text)) {
+ return parse(text + defaultUnit.getUnits()[0]);
+ }
+
+ return parse(text);
+ }
+
+ /**
+ * Parses the given string as bytes. The supported expressions are listed
under {@link
+ * MemorySize}.
+ *
+ * @param text The string to parse
+ * @return The parsed size, in bytes.
+ * @throws IllegalArgumentException Thrown, if the expression cannot be
parsed.
+ */
+ public static long parseBytes(String text) throws IllegalArgumentException {
+ checkArgument(text != null, "text can't be null");
+
+ final String trimmed = text.trim();
+ checkArgument(!trimmed.isEmpty(), "argument is an empty- or
whitespace-only string");
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <=
'9') {
+ pos++;
+ }
+
+ final String number = trimmed.substring(0, pos);
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ if (number.isEmpty()) {
+ throw new NumberFormatException("text does not start with a number");
+ }
+
+ final long value;
+ try {
+ value = Long.parseLong(number); // this throws a NumberFormatException
on overflow
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be re represented as 64bit number (numeric
overflow).");
+ }
+
+ final long multiplier =
parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L);
+ final long result = value * multiplier;
+
+ // check for overflow
+ if (result / multiplier != value) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + text
+ + "' cannot be re represented as 64bit number of bytes (numeric
overflow).");
+ }
+
+ return result;
+ }
+
+ private static Optional<MemoryUnit> parseUnit(String unit) {
+ if (matchesAny(unit, BYTES)) {
+ return Optional.of(BYTES);
+ } else if (matchesAny(unit, KILO_BYTES)) {
+ return Optional.of(KILO_BYTES);
+ } else if (matchesAny(unit, MEGA_BYTES)) {
+ return Optional.of(MEGA_BYTES);
+ } else if (matchesAny(unit, GIGA_BYTES)) {
+ return Optional.of(GIGA_BYTES);
+ } else if (matchesAny(unit, TERA_BYTES)) {
+ return Optional.of(TERA_BYTES);
+ } else if (!unit.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Memory size unit '"
+ + unit
+ + "' does not match any of the recognized units: "
+ + MemoryUnit.getAllUnits());
+ }
+
+ return Optional.empty();
+ }
+
+ private static boolean matchesAny(String str, MemoryUnit unit) {
+ for (String s : unit.getUnits()) {
+ if (s.equals(str)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Enum which defines memory unit, mostly used to parse value from
configuration file.
+ *
+ * <p>To make larger values more compact, the common size suffixes are
supported:
+ *
+ * <ul>
+ * <li>1b or 1bytes (bytes)
+ * <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+ * <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+ * <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+ * <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+ * </ul>
+ */
+ public enum MemoryUnit {
+ BYTES(new String[] {"b", "bytes"}, 1L),
+ KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L),
+ MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L),
+ GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L),
+ TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L *
1024L);
+
+ private final String[] units;
+
+ private final long multiplier;
+
+ MemoryUnit(String[] units, long multiplier) {
+ this.units = units;
+ this.multiplier = multiplier;
+ }
+
+ public String[] getUnits() {
+ return units;
+ }
+
+ public long getMultiplier() {
+ return multiplier;
+ }
+
+ public static String getAllUnits() {
+ return concatenateUnits(
+ BYTES.getUnits(),
+ KILO_BYTES.getUnits(),
+ MEGA_BYTES.getUnits(),
+ GIGA_BYTES.getUnits(),
+ TERA_BYTES.getUnits());
+ }
+
+ public static boolean hasUnit(String text) {
+ checkArgument(text != null, "text can't be null");
+
+ final String trimmed = text.trim();
+ checkArgument(!trimmed.isEmpty(), "argument is an empty- or
whitespace-only string");
+
+ final int len = trimmed.length();
+ int pos = 0;
+
+ char current;
+ while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <=
'9') {
+ pos++;
+ }
+
+ final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+ return unit.length() > 0;
+ }
+
+ private static String concatenateUnits(final String[]... allUnits) {
+ final StringBuilder builder = new StringBuilder(128);
+
+ for (String[] units : allUnits) {
+ builder.append('(');
+
+ for (String unit : units) {
+ builder.append(unit);
+ builder.append(" | ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ builder.append(") / ");
+ }
+
+ builder.setLength(builder.length() - 3);
+ return builder.toString();
+ }
+ }
+}
diff --git
a/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java
b/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java
new file mode 100644
index 000000000..7838e831c
--- /dev/null
+++ b/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.utils;
+
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+/** Tests for the {@link MemorySize} class. */
+public class MemorySizeTest {
+
+ @Test
+ public void testUnitConversion() {
+ final MemorySize zero = MemorySize.ZERO;
+ assertEquals(0, zero.getBytes());
+ assertEquals(0, zero.getKibiBytes());
+ assertEquals(0, zero.getMebiBytes());
+ assertEquals(0, zero.getGibiBytes());
+ assertEquals(0, zero.getTebiBytes());
+
+ final MemorySize bytes = new MemorySize(955);
+ assertEquals(955, bytes.getBytes());
+ assertEquals(0, bytes.getKibiBytes());
+ assertEquals(0, bytes.getMebiBytes());
+ assertEquals(0, bytes.getGibiBytes());
+ assertEquals(0, bytes.getTebiBytes());
+
+ final MemorySize kilos = new MemorySize(18500);
+ assertEquals(18500, kilos.getBytes());
+ assertEquals(18, kilos.getKibiBytes());
+ assertEquals(0, kilos.getMebiBytes());
+ assertEquals(0, kilos.getGibiBytes());
+ assertEquals(0, kilos.getTebiBytes());
+
+ final MemorySize megas = new MemorySize(15 * 1024 * 1024);
+ assertEquals(15_728_640, megas.getBytes());
+ assertEquals(15_360, megas.getKibiBytes());
+ assertEquals(15, megas.getMebiBytes());
+ assertEquals(0, megas.getGibiBytes());
+ assertEquals(0, megas.getTebiBytes());
+
+ final MemorySize teras = new MemorySize(2L * 1024 * 1024 * 1024 * 1024 +
10);
+ assertEquals(2199023255562L, teras.getBytes());
+ assertEquals(2147483648L, teras.getKibiBytes());
+ assertEquals(2097152, teras.getMebiBytes());
+ assertEquals(2048, teras.getGibiBytes());
+ assertEquals(2, teras.getTebiBytes());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalid() {
+ new MemorySize(-1);
+ }
+
+ @Test
+ public void testParseBytes() {
+ assertEquals(1234, MemorySize.parseBytes("1234"));
+ assertEquals(1234, MemorySize.parseBytes("1234b"));
+ assertEquals(1234, MemorySize.parseBytes("1234 b"));
+ assertEquals(1234, MemorySize.parseBytes("1234bytes"));
+ assertEquals(1234, MemorySize.parseBytes("1234 bytes"));
+ }
+
+ @Test
+ public void testParseKibiBytes() {
+ assertEquals(667766, MemorySize.parse("667766k").getKibiBytes());
+ assertEquals(667766, MemorySize.parse("667766 k").getKibiBytes());
+ assertEquals(667766, MemorySize.parse("667766kb").getKibiBytes());
+ assertEquals(667766, MemorySize.parse("667766 kb").getKibiBytes());
+ assertEquals(667766, MemorySize.parse("667766kibibytes").getKibiBytes());
+ assertEquals(667766, MemorySize.parse("667766 kibibytes").getKibiBytes());
+ }
+
+ @Test
+ public void testParseMebiBytes() {
+ assertEquals(7657623, MemorySize.parse("7657623m").getMebiBytes());
+ assertEquals(7657623, MemorySize.parse("7657623 m").getMebiBytes());
+ assertEquals(7657623, MemorySize.parse("7657623mb").getMebiBytes());
+ assertEquals(7657623, MemorySize.parse("7657623 mb").getMebiBytes());
+ assertEquals(7657623, MemorySize.parse("7657623mebibytes").getMebiBytes());
+ assertEquals(7657623, MemorySize.parse("7657623
mebibytes").getMebiBytes());
+ }
+
+ @Test
+ public void testParseGibiBytes() {
+ assertEquals(987654, MemorySize.parse("987654g").getGibiBytes());
+ assertEquals(987654, MemorySize.parse("987654 g").getGibiBytes());
+ assertEquals(987654, MemorySize.parse("987654gb").getGibiBytes());
+ assertEquals(987654, MemorySize.parse("987654 gb").getGibiBytes());
+ assertEquals(987654, MemorySize.parse("987654gibibytes").getGibiBytes());
+ assertEquals(987654, MemorySize.parse("987654 gibibytes").getGibiBytes());
+ }
+
+ @Test
+ public void testParseTebiBytes() {
+ assertEquals(1234567, MemorySize.parse("1234567t").getTebiBytes());
+ assertEquals(1234567, MemorySize.parse("1234567 t").getTebiBytes());
+ assertEquals(1234567, MemorySize.parse("1234567tb").getTebiBytes());
+ assertEquals(1234567, MemorySize.parse("1234567 tb").getTebiBytes());
+ assertEquals(1234567, MemorySize.parse("1234567tebibytes").getTebiBytes());
+ assertEquals(1234567, MemorySize.parse("1234567
tebibytes").getTebiBytes());
+ }
+
+ @Test
+ public void testUpperCase() {
+ assertEquals(1L, MemorySize.parse("1 B").getBytes());
+ assertEquals(1L, MemorySize.parse("1 K").getKibiBytes());
+ assertEquals(1L, MemorySize.parse("1 M").getMebiBytes());
+ assertEquals(1L, MemorySize.parse("1 G").getGibiBytes());
+ assertEquals(1L, MemorySize.parse("1 T").getTebiBytes());
+ }
+
+ @Test
+ public void testTrimBeforeParse() {
+ assertEquals(155L, MemorySize.parseBytes(" 155 "));
+ assertEquals(155L, MemorySize.parseBytes(" 155 bytes "));
+ }
+
+ @Test
+ public void testParseInvalid() {
+ // null
+ try {
+ MemorySize.parseBytes(null);
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // empty
+ try {
+ MemorySize.parseBytes("");
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // blank
+ try {
+ MemorySize.parseBytes(" ");
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // no number
+ try {
+ MemorySize.parseBytes("foobar or fubar or foo bazz");
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // wrong unit
+ try {
+ MemorySize.parseBytes("16 gjah");
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // multiple numbers
+ try {
+ MemorySize.parseBytes("16 16 17 18 bytes");
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // negative number
+ try {
+ MemorySize.parseBytes("-100 bytes");
+ fail("exception expected");
+ } catch (IllegalArgumentException ignored) {
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParseNumberOverflow() {
+ MemorySize.parseBytes("100000000000000000000000000000000 bytes");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testParseNumberTimeUnitOverflow() {
+ MemorySize.parseBytes("100000000000000 tb");
+ }
+
+ @Test
+ public void testParseWithDefaultUnit() {
+ assertEquals(7, MemorySize.parse("7", MEGA_BYTES).getMebiBytes());
+ assertNotEquals(7, MemorySize.parse("7340032", MEGA_BYTES));
+ assertEquals(7, MemorySize.parse("7m", MEGA_BYTES).getMebiBytes());
+ assertEquals(7168, MemorySize.parse("7", MEGA_BYTES).getKibiBytes());
+ assertEquals(7168, MemorySize.parse("7m", MEGA_BYTES).getKibiBytes());
+ assertEquals(7, MemorySize.parse("7 m", MEGA_BYTES).getMebiBytes());
+ assertEquals(7, MemorySize.parse("7mb", MEGA_BYTES).getMebiBytes());
+ assertEquals(7, MemorySize.parse("7 mb", MEGA_BYTES).getMebiBytes());
+ assertEquals(7, MemorySize.parse("7mebibytes", MEGA_BYTES).getMebiBytes());
+ assertEquals(7, MemorySize.parse("7 mebibytes",
MEGA_BYTES).getMebiBytes());
+ }
+
+ @Test
+ public void testDivideByLong() {
+ final MemorySize memory = new MemorySize(100L);
+ assertThat(memory.divide(23), is(new MemorySize(4L)));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDivideByNegativeLong() {
+ final MemorySize memory = new MemorySize(100L);
+ memory.divide(-23L);
+ }
+
+ @Test
+ public void testToHumanReadableString() {
+ assertThat(new MemorySize(0L).toHumanReadableString(), is("0 bytes"));
+ assertThat(new MemorySize(1L).toHumanReadableString(), is("1 bytes"));
+ assertThat(new MemorySize(1024L).toHumanReadableString(), is("1024
bytes"));
+ assertThat(new MemorySize(1025L).toHumanReadableString(), is("1.001kb
(1025 bytes)"));
+ assertThat(new MemorySize(1536L).toHumanReadableString(), is("1.500kb
(1536 bytes)"));
+ assertThat(new MemorySize(1_000_000L).toHumanReadableString(),
is("976.563kb (1000000 bytes)"));
+ assertThat(
+ new MemorySize(1_000_000_000L).toHumanReadableString(), is("953.674mb
(1000000000 bytes)"));
+ assertThat(
+ new MemorySize(1_000_000_000_000L).toHumanReadableString(),
+ is("931.323gb (1000000000000 bytes)"));
+ assertThat(
+ new MemorySize(1_000_000_000_000_000L).toHumanReadableString(),
+ is("909.495tb (1000000000000000 bytes)"));
+ }
+}