This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e07d14213 [FLINK-35820] Converting Duration to String fails for big 
values (#25077)
98e07d14213 is described below

commit 98e07d14213e2b739ac231f1863b1b6f71031851
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jul 12 13:23:54 2024 +0200

    [FLINK-35820] Converting Duration to String fails for big values (#25077)
---
 .../main/java/org/apache/flink/util/TimeUtils.java | 81 ++++++++++++++++------
 .../flink/util/TimeUtilsPrettyPrintingTest.java    |  6 +-
 .../java/org/apache/flink/util/TimeUtilsTest.java  |  2 +
 3 files changed, 68 insertions(+), 21 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
index 80d38c78078..3b47da1a73c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util;
 
 import org.apache.flink.api.common.time.Time;
 
+import java.math.BigInteger;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
@@ -39,6 +40,8 @@ public class TimeUtils {
     private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
             Collections.unmodifiableMap(initMap());
 
+    private static final BigInteger NANOS_PER_SECOND = 
BigInteger.valueOf(1_000_000_000L);
+
     /**
      * Parse the given string to a java {@link Duration}. The string is in 
format "{length
      * value}{time unit label}", e.g. "123ms", "321 s". If no time unit label 
is specified, it will
@@ -79,30 +82,45 @@ public class TimeUtils {
             throw new NumberFormatException("text does not start with a 
number");
         }
 
-        final long value;
+        final BigInteger value;
         try {
-            value = Long.parseLong(number); // this throws a 
NumberFormatException on overflow
+            value = new BigInteger(number); // this throws a 
NumberFormatException
         } catch (NumberFormatException e) {
             throw new IllegalArgumentException(
-                    "The value '"
-                            + number
-                            + "' cannot be re represented as 64bit number 
(numeric overflow).");
+                    "The value '" + number + "' cannot be represented as an 
integer number.", e);
         }
 
+        final ChronoUnit unit;
         if (unitLabel.isEmpty()) {
-            return Duration.of(value, ChronoUnit.MILLIS);
-        }
-
-        ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
-        if (unit != null) {
-            return Duration.of(value, unit);
+            unit = ChronoUnit.MILLIS;
         } else {
+            unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+        }
+        if (unit == null) {
             throw new IllegalArgumentException(
                     "Time interval unit label '"
                             + unitLabel
                             + "' does not match any of the recognized units: "
                             + TimeUnit.getAllUnits());
         }
+
+        try {
+            return convertBigIntToDuration(value, unit);
+        } catch (ArithmeticException e) {
+            throw new IllegalArgumentException(
+                    "The value '"
+                            + number
+                            + "' cannot be represented as java.time.Duration 
(numeric overflow).",
+                    e);
+        }
+    }
+
+    private static Duration convertBigIntToDuration(BigInteger value, 
ChronoUnit unit) {
+        final BigInteger nanos = 
value.multiply(BigInteger.valueOf(unit.getDuration().toNanos()));
+
+        final BigInteger[] dividedAndRemainder = 
nanos.divideAndRemainder(NANOS_PER_SECOND);
+        return Duration.ofSeconds(dividedAndRemainder[0].longValueExact())
+                .plusNanos(dividedAndRemainder[1].longValueExact());
     }
 
     private static Map<String, ChronoUnit> initMap() {
@@ -136,17 +154,35 @@ public class TimeUtils {
      * <b>NOTE:</b> It supports only durations that fit into long.
      */
     public static String formatWithHighestUnit(Duration duration) {
-        long nanos = duration.toNanos();
+        BigInteger nanos = toNanos(duration);
 
         TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
         return String.format(
-                "%d %s",
-                nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+                "%s %s",
+                nanos.divide(highestIntegerUnit.getUnitAsNanos()),
                 highestIntegerUnit.getLabels().get(0));
     }
 
-    private static TimeUnit getHighestIntegerUnit(long nanos) {
-        if (nanos == 0) {
+    /**
+     * Converted from {@link Duration#toNanos()}, but produces {@link 
BigInteger} and does not throw
+     * an exception on overflow.
+     */
+    private static BigInteger toNanos(Duration duration) {
+        long tempSeconds = duration.getSeconds();
+        long tempNanos = duration.getNano();
+        if (tempSeconds < 0) {
+            // change the seconds and nano value to
+            // handle Long.MIN_VALUE case
+            tempSeconds = tempSeconds + 1;
+            tempNanos = tempNanos - NANOS_PER_SECOND.longValue();
+        }
+        return BigInteger.valueOf(tempSeconds)
+                .multiply(NANOS_PER_SECOND)
+                .add(BigInteger.valueOf(tempNanos));
+    }
+
+    private static TimeUnit getHighestIntegerUnit(BigInteger nanos) {
+        if (nanos.compareTo(BigInteger.ZERO) == 0) {
             return TimeUnit.MILLISECONDS;
         }
 
@@ -162,7 +198,7 @@ public class TimeUtils {
 
         TimeUnit highestIntegerUnit = null;
         for (TimeUnit timeUnit : orderedUnits) {
-            if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
+            if 
(nanos.remainder(timeUnit.getUnitAsNanos()).compareTo(BigInteger.ZERO) != 0) {
                 break;
             }
             highestIntegerUnit = timeUnit;
@@ -187,12 +223,13 @@ public class TimeUtils {
 
         private final ChronoUnit unit;
 
+        private final BigInteger unitAsNanos;
+
         TimeUnit(ChronoUnit unit, String[]... labels) {
             this.unit = unit;
+            this.unitAsNanos = 
BigInteger.valueOf(unit.getDuration().toNanos());
             this.labels =
-                    Arrays.stream(labels)
-                            .flatMap(ls -> Arrays.stream(ls))
-                            .collect(Collectors.toList());
+                    
Arrays.stream(labels).flatMap(Arrays::stream).collect(Collectors.toList());
         }
 
         /**
@@ -219,6 +256,10 @@ public class TimeUtils {
             return unit;
         }
 
+        public BigInteger getUnitAsNanos() {
+            return unitAsNanos;
+        }
+
         public static String getAllUnits() {
             return Arrays.stream(TimeUnit.values())
                     .map(TimeUnit::createTimeUnitString)
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
index d23ae0090d1..767c9c2b231 100644
--- 
a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
@@ -43,7 +43,11 @@ class TimeUtilsPrettyPrintingTest {
                 Arguments.of(Duration.ofHours(23), "23 h"),
                 Arguments.of(Duration.ofMillis(-1), "-1 ms"),
                 Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 
d"),
-                Arguments.of(Duration.ofHours(24), "1 d"));
+                Arguments.of(Duration.ofHours(24), "1 d"),
+                Arguments.of(Duration.ofMillis(9223372036854775807L), 
"9223372036854775807 ms"),
+                Arguments.of(
+                        Duration.ofMillis(9223372036854775807L).plusNanos(1),
+                        "9223372036854775807000001 ns"));
     }
 
     @ParameterizedTest
diff --git a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
index 349604fd042..f4010cb0fb3 100644
--- a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
@@ -40,6 +40,8 @@ class TimeUtilsTest {
         
assertThat(TimeUtils.parseDuration("424562nanosecond").getNano()).isEqualTo(424562);
         
assertThat(TimeUtils.parseDuration("424562nanoseconds").getNano()).isEqualTo(424562);
         assertThat(TimeUtils.parseDuration("424562 
ns").getNano()).isEqualTo(424562);
+        assertThat(TimeUtils.parseDuration("9223372036854775807000001 ns"))
+                
.isEqualByComparingTo(Duration.ofMillis(9223372036854775807L).plusNanos(1));
     }
 
     @Test

Reply via email to