This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 67fbbcb04 [AMORO-3632] Refine data expiration literal calculation for
date type (#3964)
67fbbcb04 is described below
commit 67fbbcb04213ef8ac7cb5faff02eeb0c347ee5c4
Author: davedwwang <[email protected]>
AuthorDate: Wed Dec 10 17:12:18 2025 +0800
[AMORO-3632] Refine data expiration literal calculation for date type
(#3964)
* [Feature]: data-retention, add support for partition column type Date
#3632
* [AMORO-3632]: data-retention, add support for partition column type Date
* [AMORO-3632]: data-retention, add support for partition column type Date
* [AMORO-3632]: data-retention, add support for partition column type Date
* [AMORO-3632] Fix #3665: Prevent long overflow in Date expiration
calculation and fix related tests
---------
---
.../maintainer/IcebergTableMaintainer.java | 40 ++++
.../amoro/server/table/TableConfigurations.java | 3 +-
.../optimizing/maintainer/TestDataExpire.java | 247 +++++++++++++++++++++
3 files changed, 289 insertions(+), 1 deletion(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index 4cf587a97..83382e672 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -729,6 +729,36 @@ public class IcebergTableMaintainer implements
TableMaintainer {
return expiredFiles;
}
+ private static int calExpireDaysForDate(long expireTimestamp,
Types.NestedField field) {
+ // Round up to next day to ensure inclusive behavior for date boundaries
+ // This allows the existing < comparison to work correctly for Date types
+ LocalDate expireDate =
+ Instant.ofEpochMilli(expireTimestamp)
+ .atZone(getDefaultZoneId(field)) //
+ .toLocalDate();
+
+ int expireDays = (int) expireDate.plusDays(1).toEpochDay();
+
+ return expireDays;
+ }
+
+ /**
+ * Convert expiration timestamp to the appropriate value based on field type.
+ *
+ * <p>This method handles different field types for data expiration:
+ *
+ * <ul>
+ * <li>TIMESTAMP: converts milliseconds to microseconds
+ * <li>LONG: handles both millisecond and second formats
+ * <li>STRING: formats timestamp as date string using configured pattern
+ * <li>DATE: converts timestamp to days since epoch
+ * </ul>
+ *
+ * @param expirationConfig expiration configuration containing format
patterns
+ * @param field the field being used for expiration
+ * @param expireTimestamp timestamp in milliseconds for expiration boundary
+ * @return comparable value appropriate for the field type
+ */
private Comparable<?> getExpireValue(
DataExpirationConfig expirationConfig, Types.NestedField field, long
expireTimestamp) {
switch (field.type().typeId()) {
@@ -750,6 +780,9 @@ public class IcebergTableMaintainer implements
TableMaintainer {
.format(
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(),
Locale.getDefault()));
+
+ case DATE:
+ return calExpireDaysForDate(expireTimestamp, field);
default:
throw new IllegalArgumentException(
"Unsupported expiration field type: " + field.type().typeId());
@@ -790,6 +823,9 @@ public class IcebergTableMaintainer implements
TableMaintainer {
DateTimeFormatter.ofPattern(
expirationConfig.getDateTimePattern(),
Locale.getDefault()));
return Expressions.lessThanOrEqual(field.name(), expireDateTime);
+ case DATE:
+ int expireDays = calExpireDaysForDate(expireTimestamp, field);
+ return Expressions.lessThan(field.name(), expireDays);
default:
return Expressions.alwaysTrue();
}
@@ -994,6 +1030,10 @@ public class IcebergTableMaintainer implements
TableMaintainer {
.atZone(getDefaultZoneId(field))
.toInstant()
.toEpochMilli());
+ } else if (type.typeId() == Type.TypeID.DATE) {
+ if (upperBound instanceof Integer) {
+ literal = Literal.of(((Integer) upperBound).longValue() * 24 * 60 * 60
* 1000);
+ }
}
return literal;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index 9b469bd32..f5b239ff4 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -190,7 +190,8 @@ public class TableConfigurations {
}
public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
- Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING,
Type.TypeID.LONG);
+ Sets.newHashSet(
+ Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG,
Type.TypeID.DATE);
private static boolean validateExpirationField(
Types.NestedField field, String name, String expirationField) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index 9e10d94c3..75b3e9a60 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -144,6 +144,38 @@ public class TestDataExpire extends ExecutorTestBase {
PrimaryKeySpec.noPrimaryKey(),
PartitionSpec.unpartitioned(),
getDefaultProp())
+ },
+ // Mixed format partitioned by date type
+ {
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(TABLE_SCHEMA3, PRIMARY_KEY_SPEC, SPEC3,
getDefaultProp())
+ },
+ {
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(TABLE_SCHEMA3, PRIMARY_KEY_SPEC, SPEC4,
getDefaultProp())
+ },
+ {
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(
+ TABLE_SCHEMA3, PRIMARY_KEY_SPEC, PartitionSpec.unpartitioned(),
getDefaultProp())
+ },
+ {
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(
+ TABLE_SCHEMA3, PrimaryKeySpec.noPrimaryKey(), SPEC3,
getDefaultProp())
+ },
+ {
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(
+ TABLE_SCHEMA3, PrimaryKeySpec.noPrimaryKey(), SPEC4,
getDefaultProp())
+ },
+ {
+ new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+ new BasicTableTestHelper(
+ TABLE_SCHEMA3,
+ PrimaryKeySpec.noPrimaryKey(),
+ PartitionSpec.unpartitioned(),
+ getDefaultProp())
}
};
}
@@ -165,6 +197,19 @@ public class TestDataExpire extends ExecutorTestBase {
public static final PartitionSpec SPEC2 =
PartitionSpec.builderFor(TABLE_SCHEMA2).identity("op_time").build();
+ public static final Schema TABLE_SCHEMA3 =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get()),
+ Types.NestedField.required(3, "ts", Types.LongType.get()),
+ Types.NestedField.required(4, "op_time", Types.DateType.get()));
+
+ public static final PartitionSpec SPEC3 =
+ PartitionSpec.builderFor(TABLE_SCHEMA3).identity("op_time").build();
+
+ public static final PartitionSpec SPEC4 =
+ PartitionSpec.builderFor(TABLE_SCHEMA3).day("op_time").build();
+
public TestDataExpire(CatalogTestHelper catalogTestHelper, TableTestHelper
tableTestHelper) {
super(catalogTestHelper, tableTestHelper);
}
@@ -201,6 +246,10 @@ public class TestDataExpire extends ExecutorTestBase {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
+ } else if (expireByDate()) {
+ expected =
+ Lists.newArrayList(
+ createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
@@ -256,6 +305,8 @@ public class TestDataExpire extends ExecutorTestBase {
if (tableTestHelper().partitionSpec().isPartitioned()) {
if (expireByStringDate()) {
assertScanResult(scanAfterExpire, 1, 0);
+ } else if (expireByDate()) {
+ assertScanResult(scanAfterExpire, 1, 0);
} else {
assertScanResult(scanAfterExpire, 3, 0);
}
@@ -270,6 +321,10 @@ public class TestDataExpire extends ExecutorTestBase {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
+ } else if (expireByDate()) {
+ expected =
+ Lists.newArrayList(
+ createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
@@ -373,6 +428,10 @@ public class TestDataExpire extends ExecutorTestBase {
expected =
Lists.newArrayList(
createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
+ } else if (expireByDate()) {
+ expected =
+ Lists.newArrayList(
+ createRecord(2, "222", parseMillis("2022-01-03T12:00:00"),
"2022-01-03T12:00:00"));
} else {
expected =
Lists.newArrayList(
@@ -549,6 +608,9 @@ public class TestDataExpire extends ExecutorTestBase {
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
break;
+ case DATE:
+ time =
LocalDateTime.parse(opTime).atZone(ZoneOffset.UTC).toLocalDate();
+ break;
default:
time = opTime;
}
@@ -637,4 +699,189 @@ public class TestDataExpire extends ExecutorTestBase {
Map<String, String> properties = table.properties();
return TableConfigurations.parseDataExpirationConfig(properties);
}
+
+ private boolean expireByDate() {
+ String expireField =
+ CompatiblePropertyUtil.propertyAsString(
+ getMixedTable().properties(),
TableProperties.DATA_EXPIRATION_FIELD, "");
+ Types.NestedField field = getMixedTable().schema().findField(expireField);
+ return field != null && field.type().typeId().equals(Type.TypeID.DATE);
+ }
+
+ @Test
+ public void testDateTypeBoundaryConditions() {
+ assumeTrue(expireByDate());
+
+ DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+ Types.NestedField field =
getMixedTable().schema().findField(config.getExpirationField());
+ List<Record> records =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-03T00:00:00"),
"2022-01-03T00:00:00"),
+ createRecord(2, "222", parseMillis("2022-01-03T00:00:01"),
"2022-01-03T00:00:01"),
+ createRecord(3, "333", parseMillis("2022-01-03T23:59:59"),
"2022-01-03T23:59:59"),
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"));
+
+ records.forEach(
+ r ->
+ OptimizingTestHelpers.appendBase(
+ getMixedTable(),
+ tableTestHelper()
+ .writeBaseStore(getMixedTable(), 0, Lists.newArrayList(r),
false)));
+
+ getMaintainerAndExpire(config, "2022-01-04T12:00:00.000");
+
+ List<Record> result = readSortedBaseRecords(getMixedTable());
+
+ List<Record> expected;
+ if (getMixedTable().spec().isPartitioned()) {
+ expected =
+ Lists.newArrayList(
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"));
+
+ } else {
+ expected =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-03T00:00:00"),
"2022-01-03T00:00:00"),
+ createRecord(2, "222", parseMillis("2022-01-03T00:00:01"),
"2022-01-03T00:00:01"),
+ createRecord(3, "333", parseMillis("2022-01-03T23:59:59"),
"2022-01-03T23:59:59"),
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"));
+ }
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testDateTypeBoundaryConditionsFileLevel() {
+ assumeTrue(expireByDate());
+
+ getMixedTable()
+ .updateProperties()
+ .set(TableProperties.DATA_EXPIRATION_LEVEL,
DataExpirationConfig.ExpireLevel.FILE.name())
+ .commit();
+
+ List<Record> records =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-03T00:00:00"),
"2022-01-03T00:00:00"),
+ createRecord(2, "222", parseMillis("2022-01-03T00:00:01"),
"2022-01-03T00:00:01"),
+ createRecord(3, "333", parseMillis("2022-01-03T23:59:59"),
"2022-01-03T23:59:59"),
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"));
+
+ records.forEach(
+ r ->
+ OptimizingTestHelpers.appendBase(
+ getMixedTable(),
+ tableTestHelper()
+ .writeBaseStore(getMixedTable(), 0, Lists.newArrayList(r),
false)));
+
+ DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+ getMaintainerAndExpire(config, "2022-01-04T12:00:00.000");
+
+ List<Record> result = readSortedBaseRecords(getMixedTable());
+ List<Record> expected =
+ Lists.newArrayList(
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"));
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testDateTypeBoundaryConditionsPartitionLevel() {
+ assumeTrue(expireByDate() &&
tableTestHelper().partitionSpec().isPartitioned());
+
+ List<Record> records =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-03T00:00:00"),
"2022-01-03T00:00:00"),
+ createRecord(2, "222", parseMillis("2022-01-03T23:59:59"),
"2022-01-03T23:59:59"),
+ createRecord(3, "333", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"),
+ createRecord(4, "444", parseMillis("2022-01-04T23:59:59"),
"2022-01-04T23:59:59"));
+
+ OptimizingTestHelpers.appendBase(
+ getMixedTable(), tableTestHelper().writeBaseStore(getMixedTable(), 0,
records, false));
+
+ DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+ getMaintainerAndExpire(config, "2022-01-05T12:00:00.000");
+
+ List<Record> result = readSortedBaseRecords(getMixedTable());
+
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ @Test
+ public void testDateTypeBoundaryConditionsKeyedTable() {
+ assumeTrue(expireByDate() && isKeyedTable());
+
+ KeyedTable keyedTable = getMixedTable().asKeyedTable();
+
+ List<Record> baseRecords =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-03T00:00:00"),
"2022-01-03T00:00:00"),
+ createRecord(2, "222", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"));
+ OptimizingTestHelpers.appendBase(
+ keyedTable, tableTestHelper().writeBaseStore(keyedTable, 0,
baseRecords, false));
+
+ List<Record> changeRecords =
+ Lists.newArrayList(
+ createRecord(3, "333", parseMillis("2022-01-03T23:59:59"),
"2022-01-03T23:59:59"),
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:01"),
"2022-01-04T00:00:01"));
+ OptimizingTestHelpers.appendChange(
+ keyedTable,
+ tableTestHelper()
+ .writeChangeStore(keyedTable, 1L, ChangeAction.INSERT,
changeRecords, false));
+
+ DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
+ MixedTableMaintainer tableMaintainer = new
MixedTableMaintainer(keyedTable, null);
+ tableMaintainer.expireDataFrom(
+ config,
+ LocalDateTime.parse("2022-01-04T12:00:00.000")
+ .atZone(
+ IcebergTableMaintainer.getDefaultZoneId(
+
keyedTable.schema().findField(config.getExpirationField())))
+ .toInstant());
+
+ List<Record> records = readSortedKeyedRecords(keyedTable);
+ List<Record> expected;
+ if (getMixedTable().spec().isPartitioned()) {
+ expected =
+ Lists.newArrayList(
+ createRecord(2, "222", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"),
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:01"),
"2022-01-04T00:00:01"));
+ } else {
+ expected =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-03T00:00:00"),
"2022-01-03T00:00:00"),
+ createRecord(2, "222", parseMillis("2022-01-04T00:00:00"),
"2022-01-04T00:00:00"),
+ createRecord(3, "333", parseMillis("2022-01-03T23:59:59"),
"2022-01-03T23:59:59"),
+ createRecord(4, "444", parseMillis("2022-01-04T00:00:01"),
"2022-01-04T00:00:01"));
+ }
+
+ Assert.assertEquals(expected, records);
+ }
+
+ @Test
+ public void testDateTypeCrossDateBoundaryScenarios() {
+ assumeTrue(expireByDate());
+
+ List<Record> records =
+ Lists.newArrayList(
+ createRecord(1, "111", parseMillis("2022-01-31T23:59:59"),
"2022-01-31T23:59:59"),
+ createRecord(2, "222", parseMillis("2022-02-01T00:00:00"),
"2022-02-01T00:00:00"),
+ createRecord(3, "333", parseMillis("2021-12-31T23:59:59"),
"2021-12-31T23:59:59"),
+ createRecord(4, "444", parseMillis("2022-01-01T00:00:00"),
"2022-01-01T00:00:00"),
+ createRecord(5, "555", parseMillis("2020-02-29T12:00:00"),
"2020-02-29T12:00:00"),
+ createRecord(6, "666", parseMillis("2020-03-01T00:00:00"),
"2020-03-01T00:00:00"));
+
+ records.forEach(
+ r ->
+ OptimizingTestHelpers.appendBase(
+ getMixedTable(),
+ tableTestHelper()
+ .writeBaseStore(getMixedTable(), 0, Lists.newArrayList(r),
false)));
+
+ DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+ getMaintainerAndExpire(config, "2022-02-02T12:00:00.000");
+
+ List<Record> result = readSortedBaseRecords(getMixedTable());
+
+ Assert.assertEquals(result.size(), 0);
+ }
}