lsyldliu commented on code in PR #25108:
URL: https://github.com/apache/flink/pull/25108#discussion_r1685891811
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1520,6 +1520,58 @@ void
testRefreshMaterializedTableWithInvalidParameterInContinuousMode() throws E
.asSerializableString()));
}
+ @Test
+ void testMaterializedTableDefinitionQueryContainsTemporaryResources()
throws Exception {
Review Comment:
This is an unrelated test with your fix?
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1417,7 +1417,7 @@ void
testPeriodicRefreshMaterializedTableWithPartitionOptions() throws Exception
sessionHandle,
materializedTableIdentifier.asSerializableString(),
true,
- "2024-01-02 00:00:00",
+ "2024-01-03 00:00:00",
Review Comment:
I found this test should be optimized as follow:
```
List<Row> data = new ArrayList<>();
// create materialized table with partition formatter
createAndVerifyCreateMaterializedTableWithData(
"my_materialized_table",
data,
Collections.singletonMap("ds", "yyyy-MM-dd"),
RefreshMode.FULL);
ObjectIdentifier materializedTableIdentifier =
ObjectIdentifier.of(
fileSystemCatalogName, TEST_DEFAULT_DATABASE,
"my_materialized_table");
// add data to all data list
data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
// refresh the materialized table with period schedule
long startTime = System.currentTimeMillis();
OperationHandle periodRefreshTableHandle =
service.refreshMaterializedTable(
sessionHandle,
materializedTableIdentifier.asSerializableString(),
true,
"2024-01-03 00:00:00",
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap());
```
Only then periodic refresh is matched with full mode, not continuous mode.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##########
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
assertThat(actualStatement).isEqualTo(expectedStatement);
}
- @Test
- void testGetPeriodRefreshPartition() {
- String schedulerTime = "2024-01-01 00:00:00";
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
- tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("testData")
+ void testGetPeriodRefreshPartition(TestSpec testSpec) {
ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog",
"database", "table");
- Map<String, String> actualRefreshPartition =
- MaterializedTableManager.getPeriodRefreshPartition(
- schedulerTime, objectIdentifier, tableOptions,
ZoneId.systemDefault());
- Map<String, String> expectedRefreshPartition = new HashMap<>();
- expectedRefreshPartition.put("day", "2024-01-01");
- expectedRefreshPartition.put("hour", "00");
+ if (testSpec.errorMessage == null) {
+ Map<String, String> actualRefreshPartition =
+ MaterializedTableManager.getPeriodRefreshPartition(
+ testSpec.schedulerTime,
+ testSpec.freshness,
+ objectIdentifier,
+ testSpec.tableOptions,
+ ZoneId.systemDefault());
- assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+ } else {
+ assertThatThrownBy(
+ () ->
+
MaterializedTableManager.getPeriodRefreshPartition(
+ testSpec.schedulerTime,
+ testSpec.freshness,
+ objectIdentifier,
+ testSpec.tableOptions,
+ ZoneId.systemDefault()))
+ .hasMessage(testSpec.errorMessage);
+ }
}
- @Test
- void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
- // scheduler time is null
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
- tableOptions.put("partition.fields.hour.date-formatter", "HH");
+ static Stream<TestSpec> testData() {
+ return Stream.of(
+ // The interval of freshness match the partition specified by
the 'date-formatter'.
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 00:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2024-01-01")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 01:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2024-01-02")
+ .expectedRefreshPartition("hour", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "22"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "20"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("8"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "16"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("12"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "12"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "59"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "58"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "56"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("5"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "55"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("6"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "54"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("10"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "50"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("12"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "48"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("15"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "45"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("30"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "30"),
- ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog",
"database", "table");
+ // The interval of freshness is larger than the partition
specified by the
+ // 'date-formatter'.
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "00")
+ .expectedRefreshPartition("minute", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "00"),
Review Comment:
Add test case:
```
TestSpec.create()
.schedulerTime("2024-01-01 01:00:00")
.freshness(IntervalFreshness.ofHour("1"))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter", "HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
.expectedRefreshPartition("day", "2024-01-01")
.expectedRefreshPartition("hour", "00")
.expectedRefreshPartition("minute", "00"),
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java:
##########
@@ -735,12 +736,26 @@ public static String formatTimestampMillis(long ts,
String format, TimeZone tz)
public static String formatTimestampString(
String dateStr, String fromFormat, String toFormat, TimeZone tz) {
+ return formatTimestampStringWithOffset(dateStr, fromFormat, toFormat,
tz, 0);
+ }
+
+ public static String formatTimestampStringWithOffset(
+ String dateStr, String fromFormat, String toFormat, TimeZone tz,
long offsetMills) {
SimpleDateFormat fromFormatter = FORMATTER_CACHE.get(fromFormat);
fromFormatter.setTimeZone(tz);
SimpleDateFormat toFormatter = FORMATTER_CACHE.get(toFormat);
toFormatter.setTimeZone(tz);
try {
- return toFormatter.format(fromFormatter.parse(dateStr));
+ Date date = fromFormatter.parse(dateStr);
Review Comment:
Can we change the code as follows:
```
Date date = fromFormatter.parse(dateStr);
if (offsetMills != 0) {
date = new Date(date.getTime() + offsetMills);
}
```
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##########
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
assertThat(actualStatement).isEqualTo(expectedStatement);
}
- @Test
- void testGetPeriodRefreshPartition() {
- String schedulerTime = "2024-01-01 00:00:00";
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
- tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("testData")
+ void testGetPeriodRefreshPartition(TestSpec testSpec) {
ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog",
"database", "table");
- Map<String, String> actualRefreshPartition =
- MaterializedTableManager.getPeriodRefreshPartition(
- schedulerTime, objectIdentifier, tableOptions,
ZoneId.systemDefault());
- Map<String, String> expectedRefreshPartition = new HashMap<>();
- expectedRefreshPartition.put("day", "2024-01-01");
- expectedRefreshPartition.put("hour", "00");
+ if (testSpec.errorMessage == null) {
+ Map<String, String> actualRefreshPartition =
+ MaterializedTableManager.getPeriodRefreshPartition(
+ testSpec.schedulerTime,
+ testSpec.freshness,
+ objectIdentifier,
+ testSpec.tableOptions,
+ ZoneId.systemDefault());
- assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+ } else {
+ assertThatThrownBy(
+ () ->
+
MaterializedTableManager.getPeriodRefreshPartition(
+ testSpec.schedulerTime,
+ testSpec.freshness,
+ objectIdentifier,
+ testSpec.tableOptions,
+ ZoneId.systemDefault()))
+ .hasMessage(testSpec.errorMessage);
+ }
}
- @Test
- void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
- // scheduler time is null
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
- tableOptions.put("partition.fields.hour.date-formatter", "HH");
+ static Stream<TestSpec> testData() {
+ return Stream.of(
+ // The interval of freshness match the partition specified by
the 'date-formatter'.
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 00:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2024-01-01")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 01:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2024-01-02")
+ .expectedRefreshPartition("hour", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "22"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "20"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("8"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "16"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("12"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "12"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "59"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "58"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "56"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("5"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "55"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("6"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "54"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("10"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "50"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("12"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "48"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("15"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "45"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("30"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "30"),
Review Comment:
Please add a current day test case:
```
TestSpec.create()
.schedulerTime("2024-01-01 00:30:00")
.freshness(IntervalFreshness.ofMinute("30"))
.tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
.tableOptions("partition.fields.hour.date-formatter", "HH")
.tableOptions("partition.fields.minute.date-formatter", "mm")
.expectedRefreshPartition("day", "2024-01-01")
.expectedRefreshPartition("hour", "00")
.expectedRefreshPartition("minute", "00"),
```
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManagerTest.java:
##########
@@ -99,56 +103,340 @@ void testGenerateInsertStatementWithDynamicOptions() {
assertThat(actualStatement).isEqualTo(expectedStatement);
}
- @Test
- void testGetPeriodRefreshPartition() {
- String schedulerTime = "2024-01-01 00:00:00";
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
- tableOptions.put("partition.fields.hour.date-formatter", "HH");
-
+ @ParameterizedTest(name = "{index}: {0}")
+ @MethodSource("testData")
+ void testGetPeriodRefreshPartition(TestSpec testSpec) {
ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog",
"database", "table");
- Map<String, String> actualRefreshPartition =
- MaterializedTableManager.getPeriodRefreshPartition(
- schedulerTime, objectIdentifier, tableOptions,
ZoneId.systemDefault());
- Map<String, String> expectedRefreshPartition = new HashMap<>();
- expectedRefreshPartition.put("day", "2024-01-01");
- expectedRefreshPartition.put("hour", "00");
+ if (testSpec.errorMessage == null) {
+ Map<String, String> actualRefreshPartition =
+ MaterializedTableManager.getPeriodRefreshPartition(
+ testSpec.schedulerTime,
+ testSpec.freshness,
+ objectIdentifier,
+ testSpec.tableOptions,
+ ZoneId.systemDefault());
- assertThat(actualRefreshPartition).isEqualTo(expectedRefreshPartition);
+
assertThat(actualRefreshPartition).isEqualTo(testSpec.expectedRefreshPartition);
+ } else {
+ assertThatThrownBy(
+ () ->
+
MaterializedTableManager.getPeriodRefreshPartition(
+ testSpec.schedulerTime,
+ testSpec.freshness,
+ objectIdentifier,
+ testSpec.tableOptions,
+ ZoneId.systemDefault()))
+ .hasMessage(testSpec.errorMessage);
+ }
}
- @Test
- void testGetPeriodRefreshPartitionWithInvalidSchedulerTime() {
- // scheduler time is null
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("partition.fields.day.date-formatter", "yyyy-MM-dd");
- tableOptions.put("partition.fields.hour.date-formatter", "HH");
+ static Stream<TestSpec> testData() {
+ return Stream.of(
+ // The interval of freshness match the partition specified by
the 'date-formatter'.
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 00:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2024-01-01")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-02 01:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2024-01-02")
+ .expectedRefreshPartition("hour", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "22"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "20"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("8"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "16"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("12"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "12"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "59"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "58"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "56"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("5"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "55"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("6"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "54"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("10"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "50"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("12"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "48"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("15"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "45"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("30"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "30"),
- ObjectIdentifier objectIdentifier = ObjectIdentifier.of("catalog",
"database", "table");
+ // The interval of freshness is larger than the partition
specified by the
+ // 'date-formatter'.
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "00")
+ .expectedRefreshPartition("minute", "00"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+
.tableOptions("partition.fields.minute.date-formatter", "mm")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23")
+ .expectedRefreshPartition("minute", "00"),
+ // The interval of freshness is less than the partition
specified by the
+ // 'date-formatter'.
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 01:00:00")
+ .freshness(IntervalFreshness.ofHour("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 02:00:00")
+ .freshness(IntervalFreshness.ofHour("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofHour("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 04:00:00")
+ .freshness(IntervalFreshness.ofHour("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("2"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("4"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("15"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .expectedRefreshPartition("day", "2023-12-31")
+ .expectedRefreshPartition("hour", "23"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:00:00")
+ .freshness(IntervalFreshness.ofMinute("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2023-12-31"),
+ TestSpec.create()
+ .schedulerTime("2024-01-01 00:01:00")
+ .freshness(IntervalFreshness.ofMinute("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .expectedRefreshPartition("day", "2024-01-01"),
+
+ // Invalid test case.
+ TestSpec.create()
+ .schedulerTime(null)
+ .freshness(IntervalFreshness.ofDay("1"))
+ .tableOptions("partition.fields.day.date-formatter",
"yyyy-MM-dd")
+ .tableOptions("partition.fields.hour.date-formatter",
"HH")
+ .errorMessage(
+ "Scheduler time not properly set for periodic
refresh of materialized table `catalog`.`database`.`table`."),
Review Comment:
I think this error message is not clear for use, we should emphasize the
scheduler time is null.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]