This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b3e3dce8df65670bf82f2e6cd936593bbccd1924 Author: Rui Fan <[email protected]> AuthorDate: Tue Dec 5 19:54:29 2023 +0800 [FLINK-33752][Configuration] Change the displayed timeunit to day when the duration is an integral multiple of 1 day --- .../main/java/org/apache/flink/util/TimeUtils.java | 45 ++++++++++------------ .../flink/util/TimeUtilsPrettyPrintingTest.java | 7 +++- .../plan/stream/sql/agg/WindowAggregateTest.xml | 12 +++--- 3 files changed, 33 insertions(+), 31 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java index 8127989fcdb..f4b31cff35f 100644 --- a/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/TimeUtils.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -139,7 +138,19 @@ public class TimeUtils { public static String formatWithHighestUnit(Duration duration) { long nanos = duration.toNanos(); - List<TimeUnit> orderedUnits = + TimeUnit highestIntegerUnit = getHighestIntegerUnit(nanos); + return String.format( + "%d %s", + nanos / highestIntegerUnit.unit.getDuration().toNanos(), + highestIntegerUnit.getLabels().get(0)); + } + + private static TimeUnit getHighestIntegerUnit(long nanos) { + if (nanos == 0) { + return TimeUnit.MILLISECONDS; + } + + final List<TimeUnit> orderedUnits = Arrays.asList( TimeUnit.NANOSECONDS, TimeUnit.MICROSECONDS, @@ -149,29 +160,15 @@ public class TimeUtils { TimeUnit.HOURS, TimeUnit.DAYS); - TimeUnit highestIntegerUnit = - IntStream.range(0, orderedUnits.size()) - .sequential() - .filter( - idx -> - nanos % orderedUnits.get(idx).unit.getDuration().toNanos() - != 0) - .boxed() - .findFirst() - .map( - idx -> { - if (idx == 0) { - return orderedUnits.get(0); - } else { - return orderedUnits.get(idx - 1); - } - }) - .orElse(TimeUnit.MILLISECONDS); + TimeUnit highestIntegerUnit = null; + for (TimeUnit timeUnit : orderedUnits) { + if (nanos % timeUnit.unit.getDuration().toNanos() != 0) { + break; + } + highestIntegerUnit = timeUnit; + } - return String.format( - "%d %s", - nanos / highestIntegerUnit.unit.getDuration().toNanos(), - highestIntegerUnit.getLabels().get(0)); + return checkNotNull(highestIntegerUnit, "Should find a highestIntegerUnit."); } /** Enum which defines time unit, mostly used to parse value from configuration file. */ diff --git a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java index 9862e993b74..d23ae0090d1 100644 --- a/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/TimeUtilsPrettyPrintingTest.java @@ -23,6 +23,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.time.Duration; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -38,7 +39,11 @@ class TimeUtilsPrettyPrintingTest { Arguments.of(Duration.ofMillis(200), "200 ms"), Arguments.of(Duration.ofHours(1).plusSeconds(3), "3603 s"), Arguments.of(Duration.ofSeconds(0), "0 ms"), - Arguments.of(Duration.ofMillis(60000), "1 min")); + Arguments.of(Duration.ofMillis(60000), "1 min"), + Arguments.of(Duration.ofHours(23), "23 h"), + Arguments.of(Duration.ofMillis(-1), "-1 ms"), + Arguments.of(Duration.ofMillis(TimeUnit.DAYS.toMillis(1)), "1 d"), + Arguments.of(Duration.ofHours(24), "1 d")); } @ParameterizedTest diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index d796dc20f4d..3588b92aae7 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -2386,10 +2386,10 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[a, window_start, window_end, EXPR$3]) -+- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) ++- WindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, window_start, window_end]) - +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end]) + +- WindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a, b]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime]) @@ -2434,13 +2434,13 @@ LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()]) <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[a, window_start, window_end, EXPR$3]) -+- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) ++- GlobalWindowAggregate(groupBy=[a], window=[TUMBLE(win_end=[$window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a]]) - +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[86400000 ms], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) + +- LocalWindowAggregate(groupBy=[a], window=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[1 d], offset=[8 h])], select=[a, COUNT(*) AS count1$0, slice_end('w$) AS $window_end]) +- Calc(select=[a, window_start, window_end]) - +- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[86400000 ms], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end]) + +- GlobalWindowAggregate(groupBy=[a, b], window=[TUMBLE(slice_end=[$slice_end], size=[1 d], offset=[8 h])], select=[a, b, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[a, b]]) - +- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[86400000 ms], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end]) + +- LocalWindowAggregate(groupBy=[a, b], window=[TUMBLE(time_col=[rowtime], size=[1 d], offset=[8 h])], select=[a, b, slice_end('w$) AS $slice_end]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, rowtime], metadata=[]]], fields=[a, b, c, rowtime]) ]]>
