This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 3896bbe61 [AMORO-3329] Add MemorySize for parsing memory value and 
unit (#3409)
3896bbe61 is described below

commit 3896bbe6188533d7357d5e58222d4c914add4de1
Author: Jzjsnow <5797...@qq.com>
AuthorDate: Tue Jan 21 14:08:58 2025 +0800

    [AMORO-3329] Add MemorySize for parsing memory value and unit (#3409)
    
    * [AMORO-3329] Add MemorySize for parsing memory value and unit
    
    * fixup! [AMORO-3329] Add MemorySize for parsing memory value and unit
    
    ---------
    
    Co-authored-by: jzjsnow <snow.jian...@gmail.com>
    Co-authored-by: ZhouJinsong <zhoujinsong0...@163.com>
---
 .../server/manager/FlinkOptimizerContainer.java    |  31 +-
 amoro-ams/src/main/resources/mysql/upgrade.sql     |   8 +
 amoro-ams/src/main/resources/postgres/upgrade.sql  |  36 +-
 .../manager/TestFlinkOptimizerContainer.java       |  45 ++-
 .../java/org/apache/amoro/utils/MemorySize.java    | 413 +++++++++++++++++++++
 .../org/apache/amoro/utils/MemorySizeTest.java     | 245 ++++++++++++
 6 files changed, 746 insertions(+), 32 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
index 314f13789..123a9dd12 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/FlinkOptimizerContainer.java
@@ -29,6 +29,7 @@ import 
org.apache.amoro.shade.guava32.com.google.common.base.Supplier;
 import org.apache.amoro.shade.guava32.com.google.common.base.Suppliers;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.amoro.utils.MemorySize;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.rest.RestClusterClient;
@@ -228,9 +229,11 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
             properties, resourceFlinkConf, 
FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY);
 
     resourceFlinkConf.putToOptions(
-        FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY, jobManagerMemory + 
"m");
+        FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
+        MemorySize.ofMebiBytes(jobManagerMemory).toString());
     resourceFlinkConf.putToOptions(
-        FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY, taskManagerMemory + 
"m");
+        FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
+        MemorySize.ofMebiBytes(taskManagerMemory).toString());
     resourceFlinkConf.putToOptions(
         FlinkConfKeys.YARN_APPLICATION_JOB_NAME,
         String.join(
@@ -354,28 +357,8 @@ public class FlinkOptimizerContainer extends 
AbstractResourceContainer {
     if (memoryStr == null || memoryStr.isEmpty()) {
       return 0;
     }
-    memoryStr = memoryStr.toLowerCase().trim().replaceAll("\\s", "");
-    Matcher matcher = Pattern.compile("(\\d+)([mg])").matcher(memoryStr);
-    if (matcher.matches()) {
-      long size = Long.parseLong(matcher.group(1));
-      String unit = matcher.group(2);
-      switch (unit) {
-        case "m":
-          return size;
-        case "g":
-          return size * 1024;
-        default:
-          LOG.error("Invalid memory size unit: {}, Please use m or g as the 
unit", unit);
-          return 0;
-      }
-    } else {
-      try {
-        return Long.parseLong(memoryStr);
-      } catch (NumberFormatException e) {
-        LOG.error("Invalid memory size format: {}", memoryStr);
-        return 0;
-      }
-    }
+
+    return MemorySize.parse(memoryStr).getMebiBytes();
   }
 
   private <T> T fetchCommandOutput(Process exec, Function<String, T> 
commandReader) {
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql 
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index f632c01c3..774a9b0b5 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -67,3 +67,11 @@ CREATE TABLE `http_session` (
     PRIMARY KEY(`session_id`, `context_path`, `virtual_host`),
     KEY `idx_session_expiry` (`expiry_time`)
 ) ENGINE = InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Http session store' 
ROW_FORMAT=DYNAMIC;
+
+-- update resource group memory unit
+update resource_group set properties = JSON_SET(properties, 
'$."flink-conf.jobmanager.memory.process.size"', 
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE 
JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$';
+update resource_group set properties = JSON_SET(properties, 
'$."flink-conf.taskmanager.memory.process.size"', 
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE 
JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$';
+
+-- update resource memory unit
+update resource set properties = JSON_SET(properties, 
'$."flink-conf.jobmanager.memory.process.size"', 
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.jobmanager.memory.process.size"')), 'MB')) WHERE 
JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.jobmanager.memory.process.size"')) REGEXP '^[0-9]+$';
+update resource set properties = JSON_SET(properties, 
'$."flink-conf.taskmanager.memory.process.size"', 
CONCAT(JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.taskmanager.memory.process.size"')), 'MB')) WHERE 
JSON_UNQUOTE(JSON_EXTRACT(properties, 
'$."flink-conf.taskmanager.memory.process.size"')) REGEXP '^[0-9]+$';
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql 
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 225cdbd49..5d4387422 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -84,4 +84,38 @@ COMMENT ON COLUMN http_session.last_save_time IS 'Last save 
time';
 COMMENT ON COLUMN http_session.expiry_time IS 'Expiry time';
 COMMENT ON COLUMN http_session.max_interval IS 'Max internal';
 COMMENT ON COLUMN http_session.data_store IS 'Session data store';
-COMMENT ON TABLE http_session IS 'Http session store';
\ No newline at end of file
+COMMENT ON TABLE http_session IS 'Http session store';
+
+    -- update resource group memory unit
+UPDATE resource_group
+SET properties = jsonb_set(
+        properties,
+        '{flink-conf,jobmanager,memory,process,size}',
+        (properties->>'flink-conf.jobmanager.memory.process.size') || 'MB'
+    )
+WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$';
+
+UPDATE resource_group
+SET properties = jsonb_set(
+        properties,
+        '{flink-conf,taskmanager,memory,process,size}',
+        (properties->>'flink-conf.taskmanager.memory.process.size') || 'MB'
+    )
+WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$';
+
+-- update resource memory unit
+UPDATE resource
+SET properties = jsonb_set(
+        properties,
+        '{flink-conf,jobmanager,memory,process,size}',
+        (properties->>'flink-conf.jobmanager.memory.process.size') || 'MB'
+    )
+WHERE (properties->>'flink-conf.jobmanager.memory.process.size') ~ '^[0-9]+$';
+
+UPDATE resource
+SET properties = jsonb_set(
+        properties,
+        '{flink-conf,taskmanager,memory,process,size}',
+        (properties->>'flink-conf.taskmanager.memory.process.size') || 'MB'
+    )
+WHERE (properties->>'flink-conf.taskmanager.memory.process.size') ~ '^[0-9]+$';
\ No newline at end of file
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
index 0862968b4..7e0e03d4a 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestFlinkOptimizerContainer.java
@@ -42,17 +42,36 @@ public class TestFlinkOptimizerContainer {
 
   @Test
   public void testParseMemorySize() {
-    Assert.assertEquals(100, container.parseMemorySize("100"));
+    Assert.assertEquals(0, container.parseMemorySize("100"));
+    Assert.assertEquals(0, container.parseMemorySize("100k"));
+    Assert.assertEquals(0, container.parseMemorySize("100kb"));
+    Assert.assertEquals(0, container.parseMemorySize("100 k"));
+    Assert.assertEquals(0, container.parseMemorySize("100 kb"));
+    Assert.assertEquals(0, container.parseMemorySize("100K"));
+    Assert.assertEquals(0, container.parseMemorySize("100KB"));
+    Assert.assertEquals(0, container.parseMemorySize("100 K"));
+    Assert.assertEquals(0, container.parseMemorySize("100 KB"));
     Assert.assertEquals(100, container.parseMemorySize("100m"));
+    Assert.assertEquals(100, container.parseMemorySize("100mb"));
     Assert.assertEquals(100, container.parseMemorySize("100 m"));
+    Assert.assertEquals(100, container.parseMemorySize("100 mb"));
     Assert.assertEquals(100, container.parseMemorySize("100M"));
+    Assert.assertEquals(100, container.parseMemorySize("100MB"));
     Assert.assertEquals(100, container.parseMemorySize("100 M"));
+    Assert.assertEquals(100, container.parseMemorySize("100 MB"));
     Assert.assertEquals(102400, container.parseMemorySize("100g"));
+    Assert.assertEquals(102400, container.parseMemorySize("100gb"));
     Assert.assertEquals(102400, container.parseMemorySize("100 g"));
+    Assert.assertEquals(102400, container.parseMemorySize("100 gb"));
     Assert.assertEquals(102400, container.parseMemorySize("100G"));
+    Assert.assertEquals(102400, container.parseMemorySize("100GB"));
     Assert.assertEquals(102400, container.parseMemorySize("100 G"));
-    Assert.assertEquals(0, container.parseMemorySize("G100G"));
-    Assert.assertEquals(0, container.parseMemorySize("100kb"));
+    Assert.assertEquals(102400, container.parseMemorySize("100 GB"));
+    try {
+      Assert.assertEquals(0, container.parseMemorySize("G100G"));
+    } catch (NumberFormatException e) {
+      Assert.assertEquals("text does not start with a number", e.getMessage());
+    }
   }
 
   @Test
@@ -110,11 +129,11 @@ public class TestFlinkOptimizerContainer {
         FlinkOptimizerContainer.FlinkConf.buildFor(prop, 
Maps.newHashMap()).build();
 
     Assert.assertEquals(
-        100L,
+        0,
         container.getMemorySizeValue(
             prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
-        100L,
+        0,
         container.getMemorySizeValue(
             prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
 
@@ -124,7 +143,7 @@ public class TestFlinkOptimizerContainer {
     conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, 
containerProperties).build();
     prop.clear();
     Assert.assertEquals(
-        200L,
+        0,
         container.getMemorySizeValue(
             prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
@@ -143,7 +162,7 @@ public class TestFlinkOptimizerContainer {
         container.getMemorySizeValue(
             prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
     Assert.assertEquals(
-        300L,
+        0,
         container.getMemorySizeValue(
             prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
 
@@ -157,5 +176,17 @@ public class TestFlinkOptimizerContainer {
         0L,
         container.getMemorySizeValue(
             prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
+
+    
prop.put(FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY,
 "400 MB");
+    
prop.put(FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY,
 "400 MB");
+    conf = FlinkOptimizerContainer.FlinkConf.buildFor(prop, 
Maps.newHashMap()).build();
+    Assert.assertEquals(
+        400L,
+        container.getMemorySizeValue(
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.TASK_MANAGER_TOTAL_PROCESS_MEMORY));
+    Assert.assertEquals(
+        400L,
+        container.getMemorySizeValue(
+            prop, conf, 
FlinkOptimizerContainer.FlinkConfKeys.JOB_MANAGER_TOTAL_PROCESS_MEMORY));
   }
 }
diff --git a/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java 
b/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java
new file mode 100644
index 000000000..d53053375
--- /dev/null
+++ b/amoro-common/src/main/java/org/apache/amoro/utils/MemorySize.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.utils;
+
+import static 
org.apache.amoro.shade.guava32.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.GIGA_BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.KILO_BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.TERA_BYTES;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/**
+ * MemorySize is a representation of a number of bytes, viewable in different 
units.
+ *
+ * <h2>Parsing</h2>
+ *
+ * <p>The size can be parsed from a text expression. If the expression is a 
pure number, the value
+ * will be interpreted as bytes.
+ *
+ * <p>This class was originally copied from <a 
href="https://github.com/apache/flink";>Apache
+ * Flink</a>. For any inquiries regarding the original code, please refer to <a
+ * 
href="https://github.com/apache/flink/blob/master/flink-core-api/src/main/java/org/apache/flink/configuration/MemorySize.java";>org.apache.flink.configuration.MemorySize</a>.
+ */
+public class MemorySize implements java.io.Serializable, 
Comparable<MemorySize> {
+
+  private static final long serialVersionUID = 1L;
+  public static final MemorySize ZERO = new MemorySize(0L);
+
+  public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+  private static final List<MemoryUnit> ORDERED_UNITS =
+      Arrays.asList(BYTES, KILO_BYTES, MEGA_BYTES, GIGA_BYTES, TERA_BYTES);
+
+  // ------------------------------------------------------------------------
+
+  /** The memory size, in bytes. */
+  private final long bytes;
+
+  /** The memorized value returned by toString(). */
+  private transient String stringified;
+
+  /** The memorized value returned by toHumanReadableString(). */
+  private transient String humanReadableStr;
+
+  /**
+   * Constructs a new MemorySize.
+   *
+   * @param bytes The size, in bytes. Must be zero or larger.
+   */
+  public MemorySize(long bytes) {
+    checkArgument(bytes >= 0, "bytes must be >= 0");
+    this.bytes = bytes;
+  }
+
+  public static MemorySize ofMebiBytes(long mebiBytes) {
+    return new MemorySize(mebiBytes << 20);
+  }
+
+  // ------------------------------------------------------------------------
+
+  /** Gets the memory size in bytes. */
+  public long getBytes() {
+    return bytes;
+  }
+
+  /** Gets the memory size in Kibibytes (= 1024 bytes). */
+  public long getKibiBytes() {
+    return bytes >> 10;
+  }
+
+  /** Gets the memory size in Mebibytes (= 1024 Kibibytes). */
+  public int getMebiBytes() {
+    return (int) (bytes >> 20);
+  }
+
+  /** Gets the memory size in Gibibytes (= 1024 Mebibytes). */
+  public long getGibiBytes() {
+    return bytes >> 30;
+  }
+
+  /** Gets the memory size in Tebibytes (= 1024 Gibibytes). */
+  public long getTebiBytes() {
+    return bytes >> 40;
+  }
+
+  // ------------------------------------------------------------------------
+
+  @Override
+  public int hashCode() {
+    return (int) (bytes ^ (bytes >>> 32));
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj == this
+        || (obj != null
+            && obj.getClass() == this.getClass()
+            && ((MemorySize) obj).bytes == this.bytes);
+  }
+
+  @Override
+  public String toString() {
+    if (stringified == null) {
+      stringified = formatToString();
+    }
+
+    return stringified;
+  }
+
+  private String formatToString() {
+    MemoryUnit highestIntegerUnit =
+        IntStream.range(0, ORDERED_UNITS.size())
+            .sequential()
+            .filter(idx -> bytes % ORDERED_UNITS.get(idx).getMultiplier() != 0)
+            .boxed()
+            .findFirst()
+            .map(
+                idx -> {
+                  if (idx == 0) {
+                    return ORDERED_UNITS.get(0);
+                  } else {
+                    return ORDERED_UNITS.get(idx - 1);
+                  }
+                })
+            .orElse(BYTES);
+
+    return String.format(
+        "%d %s", bytes / highestIntegerUnit.getMultiplier(), 
highestIntegerUnit.getUnits()[1]);
+  }
+
+  public String toHumanReadableString() {
+    if (humanReadableStr == null) {
+      humanReadableStr = formatToHumanReadableString();
+    }
+
+    return humanReadableStr;
+  }
+
+  private String formatToHumanReadableString() {
+    MemoryUnit highestUnit =
+        IntStream.range(0, ORDERED_UNITS.size())
+            .sequential()
+            .filter(idx -> bytes > ORDERED_UNITS.get(idx).getMultiplier())
+            .boxed()
+            .max(Comparator.naturalOrder())
+            .map(ORDERED_UNITS::get)
+            .orElse(BYTES);
+
+    if (highestUnit == BYTES) {
+      return String.format("%d %s", bytes, BYTES.getUnits()[1]);
+    } else {
+      double approximate = 1.0 * bytes / highestUnit.getMultiplier();
+      return String.format(
+          Locale.ROOT, "%.3f%s (%d bytes)", approximate, 
highestUnit.getUnits()[1], bytes);
+    }
+  }
+
+  @Override
+  public int compareTo(MemorySize that) {
+    return Long.compare(this.bytes, that.bytes);
+  }
+
+  // ------------------------------------------------------------------------
+  //  Calculations
+  // ------------------------------------------------------------------------
+
+  public MemorySize add(MemorySize that) {
+    return new MemorySize(Math.addExact(this.bytes, that.bytes));
+  }
+
+  public MemorySize subtract(MemorySize that) {
+    return new MemorySize(Math.subtractExact(this.bytes, that.bytes));
+  }
+
+  public MemorySize multiply(double multiplier) {
+    checkArgument(multiplier >= 0, "multiplier must be >= 0");
+
+    BigDecimal product = 
BigDecimal.valueOf(this.bytes).multiply(BigDecimal.valueOf(multiplier));
+    if (product.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+      throw new ArithmeticException("long overflow");
+    }
+    return new MemorySize(product.longValue());
+  }
+
+  public MemorySize divide(long by) {
+    checkArgument(by >= 0, "divisor must be >= 0");
+    return new MemorySize(bytes / by);
+  }
+
+  // ------------------------------------------------------------------------
+  //  Parsing
+  // ------------------------------------------------------------------------
+
+  /**
+   * Parses the given string as as MemorySize.
+   *
+   * @param text The string to parse
+   * @return The parsed MemorySize
+   * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+   */
+  public static MemorySize parse(String text) throws IllegalArgumentException {
+    return new MemorySize(parseBytes(text));
+  }
+
+  /**
+   * Parses the given string with a default unit.
+   *
+   * @param text The string to parse.
+   * @param defaultUnit specify the default unit.
+   * @return The parsed MemorySize.
+   * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+   */
+  public static MemorySize parse(String text, MemoryUnit defaultUnit)
+      throws IllegalArgumentException {
+    if (!MemoryUnit.hasUnit(text)) {
+      return parse(text + defaultUnit.getUnits()[0]);
+    }
+
+    return parse(text);
+  }
+
+  /**
+   * Parses the given string as bytes. The supported expressions are listed 
under {@link
+   * MemorySize}.
+   *
+   * @param text The string to parse
+   * @return The parsed size, in bytes.
+   * @throws IllegalArgumentException Thrown, if the expression cannot be 
parsed.
+   */
+  public static long parseBytes(String text) throws IllegalArgumentException {
+    checkArgument(text != null, "text can't be null");
+
+    final String trimmed = text.trim();
+    checkArgument(!trimmed.isEmpty(), "argument is an empty- or 
whitespace-only string");
+
+    final int len = trimmed.length();
+    int pos = 0;
+
+    char current;
+    while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= 
'9') {
+      pos++;
+    }
+
+    final String number = trimmed.substring(0, pos);
+    final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+    if (number.isEmpty()) {
+      throw new NumberFormatException("text does not start with a number");
+    }
+
+    final long value;
+    try {
+      value = Long.parseLong(number); // this throws a NumberFormatException 
on overflow
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(
+          "The value '"
+              + number
+              + "' cannot be re represented as 64bit number (numeric 
overflow).");
+    }
+
+    final long multiplier = 
parseUnit(unit).map(MemoryUnit::getMultiplier).orElse(1L);
+    final long result = value * multiplier;
+
+    // check for overflow
+    if (result / multiplier != value) {
+      throw new IllegalArgumentException(
+          "The value '"
+              + text
+              + "' cannot be re represented as 64bit number of bytes (numeric 
overflow).");
+    }
+
+    return result;
+  }
+
+  private static Optional<MemoryUnit> parseUnit(String unit) {
+    if (matchesAny(unit, BYTES)) {
+      return Optional.of(BYTES);
+    } else if (matchesAny(unit, KILO_BYTES)) {
+      return Optional.of(KILO_BYTES);
+    } else if (matchesAny(unit, MEGA_BYTES)) {
+      return Optional.of(MEGA_BYTES);
+    } else if (matchesAny(unit, GIGA_BYTES)) {
+      return Optional.of(GIGA_BYTES);
+    } else if (matchesAny(unit, TERA_BYTES)) {
+      return Optional.of(TERA_BYTES);
+    } else if (!unit.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Memory size unit '"
+              + unit
+              + "' does not match any of the recognized units: "
+              + MemoryUnit.getAllUnits());
+    }
+
+    return Optional.empty();
+  }
+
+  private static boolean matchesAny(String str, MemoryUnit unit) {
+    for (String s : unit.getUnits()) {
+      if (s.equals(str)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Enum which defines memory unit, mostly used to parse value from 
configuration file.
+   *
+   * <p>To make larger values more compact, the common size suffixes are 
supported:
+   *
+   * <ul>
+   *   <li>1b or 1bytes (bytes)
+   *   <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+   *   <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+   *   <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+   *   <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+   * </ul>
+   */
+  public enum MemoryUnit {
+    BYTES(new String[] {"b", "bytes"}, 1L),
+    KILO_BYTES(new String[] {"k", "kb", "kibibytes"}, 1024L),
+    MEGA_BYTES(new String[] {"m", "mb", "mebibytes"}, 1024L * 1024L),
+    GIGA_BYTES(new String[] {"g", "gb", "gibibytes"}, 1024L * 1024L * 1024L),
+    TERA_BYTES(new String[] {"t", "tb", "tebibytes"}, 1024L * 1024L * 1024L * 
1024L);
+
+    private final String[] units;
+
+    private final long multiplier;
+
+    MemoryUnit(String[] units, long multiplier) {
+      this.units = units;
+      this.multiplier = multiplier;
+    }
+
+    public String[] getUnits() {
+      return units;
+    }
+
+    public long getMultiplier() {
+      return multiplier;
+    }
+
+    public static String getAllUnits() {
+      return concatenateUnits(
+          BYTES.getUnits(),
+          KILO_BYTES.getUnits(),
+          MEGA_BYTES.getUnits(),
+          GIGA_BYTES.getUnits(),
+          TERA_BYTES.getUnits());
+    }
+
+    public static boolean hasUnit(String text) {
+      checkArgument(text != null, "text can't be null");
+
+      final String trimmed = text.trim();
+      checkArgument(!trimmed.isEmpty(), "argument is an empty- or 
whitespace-only string");
+
+      final int len = trimmed.length();
+      int pos = 0;
+
+      char current;
+      while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= 
'9') {
+        pos++;
+      }
+
+      final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+      return unit.length() > 0;
+    }
+
+    private static String concatenateUnits(final String[]... allUnits) {
+      final StringBuilder builder = new StringBuilder(128);
+
+      for (String[] units : allUnits) {
+        builder.append('(');
+
+        for (String unit : units) {
+          builder.append(unit);
+          builder.append(" | ");
+        }
+
+        builder.setLength(builder.length() - 3);
+        builder.append(") / ");
+      }
+
+      builder.setLength(builder.length() - 3);
+      return builder.toString();
+    }
+  }
+}
diff --git 
a/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java 
b/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java
new file mode 100644
index 000000000..7838e831c
--- /dev/null
+++ b/amoro-common/src/test/java/org/apache/amoro/utils/MemorySizeTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.utils;
+
+import static org.apache.amoro.utils.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+/** Tests for the {@link MemorySize} class. */
+public class MemorySizeTest {
+
+  @Test
+  public void testUnitConversion() {
+    final MemorySize zero = MemorySize.ZERO;
+    assertEquals(0, zero.getBytes());
+    assertEquals(0, zero.getKibiBytes());
+    assertEquals(0, zero.getMebiBytes());
+    assertEquals(0, zero.getGibiBytes());
+    assertEquals(0, zero.getTebiBytes());
+
+    final MemorySize bytes = new MemorySize(955);
+    assertEquals(955, bytes.getBytes());
+    assertEquals(0, bytes.getKibiBytes());
+    assertEquals(0, bytes.getMebiBytes());
+    assertEquals(0, bytes.getGibiBytes());
+    assertEquals(0, bytes.getTebiBytes());
+
+    final MemorySize kilos = new MemorySize(18500);
+    assertEquals(18500, kilos.getBytes());
+    assertEquals(18, kilos.getKibiBytes());
+    assertEquals(0, kilos.getMebiBytes());
+    assertEquals(0, kilos.getGibiBytes());
+    assertEquals(0, kilos.getTebiBytes());
+
+    final MemorySize megas = new MemorySize(15 * 1024 * 1024);
+    assertEquals(15_728_640, megas.getBytes());
+    assertEquals(15_360, megas.getKibiBytes());
+    assertEquals(15, megas.getMebiBytes());
+    assertEquals(0, megas.getGibiBytes());
+    assertEquals(0, megas.getTebiBytes());
+
+    final MemorySize teras = new MemorySize(2L * 1024 * 1024 * 1024 * 1024 + 
10);
+    assertEquals(2199023255562L, teras.getBytes());
+    assertEquals(2147483648L, teras.getKibiBytes());
+    assertEquals(2097152, teras.getMebiBytes());
+    assertEquals(2048, teras.getGibiBytes());
+    assertEquals(2, teras.getTebiBytes());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalid() {
+    new MemorySize(-1);
+  }
+
+  @Test
+  public void testParseBytes() {
+    assertEquals(1234, MemorySize.parseBytes("1234"));
+    assertEquals(1234, MemorySize.parseBytes("1234b"));
+    assertEquals(1234, MemorySize.parseBytes("1234 b"));
+    assertEquals(1234, MemorySize.parseBytes("1234bytes"));
+    assertEquals(1234, MemorySize.parseBytes("1234 bytes"));
+  }
+
+  @Test
+  public void testParseKibiBytes() {
+    assertEquals(667766, MemorySize.parse("667766k").getKibiBytes());
+    assertEquals(667766, MemorySize.parse("667766 k").getKibiBytes());
+    assertEquals(667766, MemorySize.parse("667766kb").getKibiBytes());
+    assertEquals(667766, MemorySize.parse("667766 kb").getKibiBytes());
+    assertEquals(667766, MemorySize.parse("667766kibibytes").getKibiBytes());
+    assertEquals(667766, MemorySize.parse("667766 kibibytes").getKibiBytes());
+  }
+
+  @Test
+  public void testParseMebiBytes() {
+    assertEquals(7657623, MemorySize.parse("7657623m").getMebiBytes());
+    assertEquals(7657623, MemorySize.parse("7657623 m").getMebiBytes());
+    assertEquals(7657623, MemorySize.parse("7657623mb").getMebiBytes());
+    assertEquals(7657623, MemorySize.parse("7657623 mb").getMebiBytes());
+    assertEquals(7657623, MemorySize.parse("7657623mebibytes").getMebiBytes());
+    assertEquals(7657623, MemorySize.parse("7657623 
mebibytes").getMebiBytes());
+  }
+
+  @Test
+  public void testParseGibiBytes() {
+    assertEquals(987654, MemorySize.parse("987654g").getGibiBytes());
+    assertEquals(987654, MemorySize.parse("987654 g").getGibiBytes());
+    assertEquals(987654, MemorySize.parse("987654gb").getGibiBytes());
+    assertEquals(987654, MemorySize.parse("987654 gb").getGibiBytes());
+    assertEquals(987654, MemorySize.parse("987654gibibytes").getGibiBytes());
+    assertEquals(987654, MemorySize.parse("987654 gibibytes").getGibiBytes());
+  }
+
+  @Test
+  public void testParseTebiBytes() {
+    assertEquals(1234567, MemorySize.parse("1234567t").getTebiBytes());
+    assertEquals(1234567, MemorySize.parse("1234567 t").getTebiBytes());
+    assertEquals(1234567, MemorySize.parse("1234567tb").getTebiBytes());
+    assertEquals(1234567, MemorySize.parse("1234567 tb").getTebiBytes());
+    assertEquals(1234567, MemorySize.parse("1234567tebibytes").getTebiBytes());
+    assertEquals(1234567, MemorySize.parse("1234567 
tebibytes").getTebiBytes());
+  }
+
+  @Test
+  public void testUpperCase() {
+    assertEquals(1L, MemorySize.parse("1 B").getBytes());
+    assertEquals(1L, MemorySize.parse("1 K").getKibiBytes());
+    assertEquals(1L, MemorySize.parse("1 M").getMebiBytes());
+    assertEquals(1L, MemorySize.parse("1 G").getGibiBytes());
+    assertEquals(1L, MemorySize.parse("1 T").getTebiBytes());
+  }
+
+  @Test
+  public void testTrimBeforeParse() {
+    assertEquals(155L, MemorySize.parseBytes("      155      "));
+    assertEquals(155L, MemorySize.parseBytes("      155      bytes   "));
+  }
+
+  @Test
+  public void testParseInvalid() {
+    // null
+    try {
+      MemorySize.parseBytes(null);
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // empty
+    try {
+      MemorySize.parseBytes("");
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // blank
+    try {
+      MemorySize.parseBytes("     ");
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // no number
+    try {
+      MemorySize.parseBytes("foobar or fubar or foo bazz");
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // wrong unit
+    try {
+      MemorySize.parseBytes("16 gjah");
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // multiple numbers
+    try {
+      MemorySize.parseBytes("16 16 17 18 bytes");
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+
+    // negative number
+    try {
+      MemorySize.parseBytes("-100 bytes");
+      fail("exception expected");
+    } catch (IllegalArgumentException ignored) {
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testParseNumberOverflow() {
+    MemorySize.parseBytes("100000000000000000000000000000000 bytes");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testParseNumberTimeUnitOverflow() {
+    MemorySize.parseBytes("100000000000000 tb");
+  }
+
+  @Test
+  public void testParseWithDefaultUnit() {
+    assertEquals(7, MemorySize.parse("7", MEGA_BYTES).getMebiBytes());
+    assertNotEquals(7, MemorySize.parse("7340032", MEGA_BYTES));
+    assertEquals(7, MemorySize.parse("7m", MEGA_BYTES).getMebiBytes());
+    assertEquals(7168, MemorySize.parse("7", MEGA_BYTES).getKibiBytes());
+    assertEquals(7168, MemorySize.parse("7m", MEGA_BYTES).getKibiBytes());
+    assertEquals(7, MemorySize.parse("7 m", MEGA_BYTES).getMebiBytes());
+    assertEquals(7, MemorySize.parse("7mb", MEGA_BYTES).getMebiBytes());
+    assertEquals(7, MemorySize.parse("7 mb", MEGA_BYTES).getMebiBytes());
+    assertEquals(7, MemorySize.parse("7mebibytes", MEGA_BYTES).getMebiBytes());
+    assertEquals(7, MemorySize.parse("7 mebibytes", 
MEGA_BYTES).getMebiBytes());
+  }
+
+  @Test
+  public void testDivideByLong() {
+    final MemorySize memory = new MemorySize(100L);
+    assertThat(memory.divide(23), is(new MemorySize(4L)));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDivideByNegativeLong() {
+    final MemorySize memory = new MemorySize(100L);
+    memory.divide(-23L);
+  }
+
+  @Test
+  public void testToHumanReadableString() {
+    assertThat(new MemorySize(0L).toHumanReadableString(), is("0 bytes"));
+    assertThat(new MemorySize(1L).toHumanReadableString(), is("1 bytes"));
+    assertThat(new MemorySize(1024L).toHumanReadableString(), is("1024 
bytes"));
+    assertThat(new MemorySize(1025L).toHumanReadableString(), is("1.001kb 
(1025 bytes)"));
+    assertThat(new MemorySize(1536L).toHumanReadableString(), is("1.500kb 
(1536 bytes)"));
+    assertThat(new MemorySize(1_000_000L).toHumanReadableString(), 
is("976.563kb (1000000 bytes)"));
+    assertThat(
+        new MemorySize(1_000_000_000L).toHumanReadableString(), is("953.674mb 
(1000000000 bytes)"));
+    assertThat(
+        new MemorySize(1_000_000_000_000L).toHumanReadableString(),
+        is("931.323gb (1000000000000 bytes)"));
+    assertThat(
+        new MemorySize(1_000_000_000_000_000L).toHumanReadableString(),
+        is("909.495tb (1000000000000000 bytes)"));
+  }
+}


Reply via email to