This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 98e07d14213 [FLINK-35820] Converting Duration to String fails for big
values (#25077)
98e07d14213 is described below
commit 98e07d14213e2b739ac231f1863b1b6f71031851
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Jul 12 13:23:54 2024 +0200
[FLINK-35820] Converting Duration to String fails for big values (#25077)
---
.../main/java/org/apache/flink/util/TimeUtils.java | 81 ++++++++++++++++------
.../flink/util/TimeUtilsPrettyPrintingTest.java | 6 +-
.../java/org/apache/flink/util/TimeUtilsTest.java | 2 +
3 files changed, 68 insertions(+), 21 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
index 80d38c78078..3b47da1a73c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.util;
import org.apache.flink.api.common.time.Time;
+import java.math.BigInteger;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
@@ -39,6 +40,8 @@ public class TimeUtils {
private static final Map<String, ChronoUnit> LABEL_TO_UNIT_MAP =
Collections.unmodifiableMap(initMap());
+ private static final BigInteger NANOS_PER_SECOND =
BigInteger.valueOf(1_000_000_000L);
+
/**
* Parse the given string to a java {@link Duration}. The string is in
format "{length
* value}{time unit label}", e.g. "123ms", "321 s". If no time unit label
is specified, it will
@@ -79,30 +82,45 @@ public class TimeUtils {
throw new NumberFormatException("text does not start with a
number");
}
- final long value;
+ final BigInteger value;
try {
- value = Long.parseLong(number); // this throws a
NumberFormatException on overflow
+ value = new BigInteger(number); // this throws a
NumberFormatException
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
- "The value '"
- + number
- + "' cannot be re represented as 64bit number
(numeric overflow).");
+ "The value '" + number + "' cannot be represented as an
integer number.", e);
}
+ final ChronoUnit unit;
if (unitLabel.isEmpty()) {
- return Duration.of(value, ChronoUnit.MILLIS);
- }
-
- ChronoUnit unit = LABEL_TO_UNIT_MAP.get(unitLabel);
- if (unit != null) {
- return Duration.of(value, unit);
+ unit = ChronoUnit.MILLIS;
} else {
+ unit = LABEL_TO_UNIT_MAP.get(unitLabel);
+ }
+ if (unit == null) {
throw new IllegalArgumentException(
"Time interval unit label '"
+ unitLabel
+ "' does not match any of the recognized units: "
+ TimeUnit.getAllUnits());
}
+
+ try {
+ return convertBigIntToDuration(value, unit);
+ } catch (ArithmeticException e) {
+ throw new IllegalArgumentException(
+ "The value '"
+ + number
+ + "' cannot be represented as java.time.Duration
(numeric overflow).",
+ e);
+ }
+ }
+
+ private static Duration convertBigIntToDuration(BigInteger value,
ChronoUnit unit) {
+ final BigInteger nanos =
value.multiply(BigInteger.valueOf(unit.getDuration().toNanos()));
+
+ final BigInteger[] dividedAndRemainder =
nanos.divideAndRemainder(NANOS_PER_SECOND);
+ return Duration.ofSeconds(dividedAndRemainder[0].longValueExact())
+ .plusNanos(dividedAndRemainder[1].longValueExact());
}
private static Map<String, ChronoUnit> initMap() {
@@ -136,17 +154,35 @@ public class TimeUtils {
* <b>NOTE:</b> It supports only durations that fit into long.
*/
public static String formatWithHighestUnit(Duration duration) {
- long nanos = duration.toNanos();
+ BigInteger nanos = toNanos(duration);
TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
return String.format(
- "%d %s",
- nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+ "%s %s",
+ nanos.divide(highestIntegerUnit.getUnitAsNanos()),
highestIntegerUnit.getLabels().get(0));
}
- private static TimeUnit getHighestIntegerUnit(long nanos) {
- if (nanos == 0) {
+ /**
+ * Converted from {@link Duration#toNanos()}, but produces {@link
BigInteger} and does not throw
+ * an exception on overflow.
+ */
+ private static BigInteger toNanos(Duration duration) {
+ long tempSeconds = duration.getSeconds();
+ long tempNanos = duration.getNano();
+ if (tempSeconds < 0) {
+ // change the seconds and nano value to
+ // handle Long.MIN_VALUE case
+ tempSeconds = tempSeconds + 1;
+ tempNanos = tempNanos - NANOS_PER_SECOND.longValue();
+ }
+ return BigInteger.valueOf(tempSeconds)
+ .multiply(NANOS_PER_SECOND)
+ .add(BigInteger.valueOf(tempNanos));
+ }
+
+ private static TimeUnit getHighestIntegerUnit(BigInteger nanos) {
+ if (nanos.compareTo(BigInteger.ZERO) == 0) {
return TimeUnit.MILLISECONDS;
}
@@ -162,7 +198,7 @@ public class TimeUtils {
TimeUnit highestIntegerUnit = null;
for (TimeUnit timeUnit : orderedUnits) {
- if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
+ if
(nanos.remainder(timeUnit.getUnitAsNanos()).compareTo(BigInteger.ZERO) != 0) {
break;
}
highestIntegerUnit = timeUnit;
@@ -187,12 +223,13 @@ public class TimeUtils {
private final ChronoUnit unit;
+ private final BigInteger unitAsNanos;
+
TimeUnit(ChronoUnit unit, String[]... labels) {
this.unit = unit;
+ this.unitAsNanos =
BigInteger.valueOf(unit.getDuration().toNanos());
this.labels =
- Arrays.stream(labels)
- .flatMap(ls -> Arrays.stream(ls))
- .collect(Collectors.toList());
+
Arrays.stream(labels).flatMap(Arrays::stream).collect(Collectors.toList());
}
/**
@@ -219,6 +256,10 @@ public class TimeUtils {
return unit;
}
+ public BigInteger getUnitAsNanos() {
+ return unitAsNanos;
+ }
+
public static String getAllUnits() {
return Arrays.stream(TimeUnit.values())
.map(TimeUnit::createTimeUnitString)
diff --git
a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
index d23ae0090d1..767c9c2b231 100644
---
a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
+++
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
@@ -43,7 +43,11 @@ class TimeUtilsPrettyPrintingTest {
Arguments.of(Duration.ofHours(23), "23 h"),
Arguments.of(Duration.ofMillis(-1), "-1 ms"),
Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1
d"),
- Arguments.of(Duration.ofHours(24), "1 d"));
+ Arguments.of(Duration.ofHours(24), "1 d"),
+ Arguments.of(Duration.ofMillis(9223372036854775807L),
"9223372036854775807 ms"),
+ Arguments.of(
+ Duration.ofMillis(9223372036854775807L).plusNanos(1),
+ "9223372036854775807000001 ns"));
}
@ParameterizedTest
diff --git a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
index 349604fd042..f4010cb0fb3 100644
--- a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java
@@ -40,6 +40,8 @@ class TimeUtilsTest {
assertThat(TimeUtils.parseDuration("424562nanosecond").getNano()).isEqualTo(424562);
assertThat(TimeUtils.parseDuration("424562nanoseconds").getNano()).isEqualTo(424562);
assertThat(TimeUtils.parseDuration("424562
ns").getNano()).isEqualTo(424562);
+ assertThat(TimeUtils.parseDuration("9223372036854775807000001 ns"))
+
.isEqualByComparingTo(Duration.ofMillis(9223372036854775807L).plusNanos(1));
}
@Test