This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3e3dce8df65670bf82f2e6cd936593bbccd1924
Author: Rui Fan <[email protected]>
AuthorDate: Tue Dec 5 19:54:29 2023 +0800

    [FLINK-33752][Configuration] Change the displayed timeunit to day when the 
duration is an integral multiple of 1 day
---
 .../main/java/org/apache/flink/util/TimeUtils.java | 45 ++++++++++------------
 .../flink/util/TimeUtilsPrettyPrintingTest.java    |  7 +++-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 12 +++---
 3 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
index 8127989fcdb..f4b31cff35f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -139,7 +138,19 @@ public class TimeUtils {
     public static String formatWithHighestUnit(Duration duration) {
         long nanos = duration.toNanos();
 
-        List<TimeUnit> orderedUnits =
+        TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos);
+        return String.format(
+                "%d %s",
+                nanos / highestIntegerUnit.unit.getDuration().toNanos(),
+                highestIntegerUnit.getLabels().get(0));
+    }
+
+    private static TimeUnit getHighestIntegerUnit(long nanos) {
+        if (nanos == 0) {
+            return TimeUnit.MILLISECONDS;
+        }
+
+        final List<TimeUnit> orderedUnits =
                 Arrays.asList(
                         TimeUnit.NANOSECONDS,
                         TimeUnit.MICROSECONDS,
@@ -149,29 +160,15 @@ public class TimeUtils {
                         TimeUnit.HOURS,
                         TimeUnit.DAYS);
 
-        TimeUnit highestIntegerUnit =
-                IntStream.range(0, orderedUnits.size())
-                        .sequential()
-                        .filter(
-                                idx ->
-                                        nanos % 
orderedUnits.get(idx).unit.getDuration().toNanos()
-                                                != 0)
-                        .boxed()
-                        .findFirst()
-                        .map(
-                                idx -> {
-                                    if (idx == 0) {
-                                        return orderedUnits.get(0);
-                                    } else {
-                                        return orderedUnits.get(idx - 1);
-                                    }
-                                })
-                        .orElse(TimeUnit.MILLISECONDS);
+        TimeUnit highestIntegerUnit = null;
+        for (TimeUnit timeUnit : orderedUnits) {
+            if (nanos % timeUnit.unit.getDuration().toNanos() != 0) {
+                break;
+            }
+            highestIntegerUnit = timeUnit;
+        }
 
-        return String.format(
-                "%d %s",
-                nanos / highestIntegerUnit.unit.getDuration().toNanos(),
-                highestIntegerUnit.getLabels().get(0));
+        return checkNotNull(highestIntegerUnit, "Should find a 
highestIntegerUnit.");
     }
 
     /** Enum which defines time unit, mostly used to parse value from 
configuration file. */
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
index 9862e993b74..d23ae0090d1 100644
--- 
a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -38,7 +39,11 @@ class TimeUtilsPrettyPrintingTest {
                 Arguments.of(Duration.ofMillis(200), "200 ms"),
                 Arguments.of(Duration.ofHours(1).plusSeconds(3), "3603 s"),
                 Arguments.of(Duration.ofSeconds(0), "0 ms"),
-                Arguments.of(Duration.ofMillis(60000), "1 min"));
+                Arguments.of(Duration.ofMillis(60000), "1 min"),
+                Arguments.of(Duration.ofHours(23), "23 h"),
+                Arguments.of(Duration.ofMillis(-1), "-1 ms"),
+                Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 
d"),
+                Arguments.of(Duration.ofHours(24), "1 d"));
     }
 
     @ParameterizedTest
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index d796dc20f4d..3588b92aae7 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2386,10 +2386,10 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, window_start, window_end, EXPR$3])
-+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) 
AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
++- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS 
EXPR$3, start('w$) AS window_start, end('w$) AS window_end])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, window_start, window_end])
-         +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], 
size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, 
end('w$) AS window_end])
+         +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], 
size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) 
AS window_end])
             +- Exchange(distribution=[hash[a, b]])
                +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
                   +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], 
fields=[a, b, c, rowtime])
@@ -2434,13 +2434,13 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[a, window_start, window_end, EXPR$3])
-+- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], 
size=[86400000 ms], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, 
start('w$) AS window_start, end('w$) AS window_end])
++- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], 
size=[1 d], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS 
window_start, end('w$) AS window_end])
    +- Exchange(distribution=[hash[a]])
-      +- LocalWindowAggregate(groupBy=[a], 
window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 
ms], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS 
$window_end])
+      +- LocalWindowAggregate(groupBy=[a], 
window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], 
offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end])
          +- Calc(select=[a, window_start, window_end])
-            +- GlobalWindowAggregate(groupBy=[a, b], 
window=[TUMBLE(slice_end=[$slice_end], size=[86400000 ms], offset=[8 h])], 
select=[a, b, start('w$) AS window_start, end('w$) AS window_end])
+            +- GlobalWindowAggregate(groupBy=[a, b], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 d], offset=[8 h])], select=[a, 
b, start('w$) AS window_start, end('w$) AS window_end])
                +- Exchange(distribution=[hash[a, b]])
-                  +- LocalWindowAggregate(groupBy=[a, b], 
window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], 
select=[a, b, slice_end('w$) AS $slice_end])
+                  +- LocalWindowAggregate(groupBy=[a, b], 
window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, 
slice_end('w$) AS $slice_end])
                      +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], 
fields=[a, b, c, rowtime])
 ]]>

Reply via email to