This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 67fbbcb04 [AMORO-3632] Refine data expiration literal calculation for 
date type (#3964)
67fbbcb04 is described below

commit 67fbbcb04213ef8ac7cb5faff02eeb0c347ee5c4
Author: davedwwang <[email protected]>
AuthorDate: Wed Dec 10 17:12:18 2025 +0800

    [AMORO-3632] Refine data expiration literal calculation for date type 
(#3964)
    
    * [Feature]: data-retention, add support for partition column type Date 
#3632
    
    * [AMORO-3632]: data-retention, add support for partition column type Date
    
    * [AMORO-3632]: data-retention, add support for partition column type Date
    
    * [AMORO-3632]: data-retention, add support for partition column type Date
    
    * [AMORO-3632] Fix #3665: Prevent long overflow in Date expiration 
calculation and fix related tests
    
    ---------
---
 .../maintainer/IcebergTableMaintainer.java         |  40 ++++
 .../amoro/server/table/TableConfigurations.java    |   3 +-
 .../optimizing/maintainer/TestDataExpire.java      | 247 +++++++++++++++++++++
 3 files changed, 289 insertions(+), 1 deletion(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
index 4cf587a97..83382e672 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/maintainer/IcebergTableMaintainer.java
@@ -729,6 +729,36 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     return expiredFiles;
   }
 
+  private static int calExpireDaysForDate(long expireTimestamp, 
Types.NestedField field) {
+    // Round up to next day to ensure inclusive behavior for date boundaries
+    // This allows the existing < comparison to work correctly for Date types
+    LocalDate expireDate =
+        Instant.ofEpochMilli(expireTimestamp)
+            .atZone(getDefaultZoneId(field)) //
+            .toLocalDate();
+
+    int expireDays = (int) expireDate.plusDays(1).toEpochDay();
+
+    return expireDays;
+  }
+
+  /**
+   * Convert expiration timestamp to the appropriate value based on field type.
+   *
+   * <p>This method handles different field types for data expiration:
+   *
+   * <ul>
+   *   <li>TIMESTAMP: converts milliseconds to microseconds
+   *   <li>LONG: handles both millisecond and second formats
+   *   <li>STRING: formats timestamp as date string using configured pattern
+   *   <li>DATE: converts timestamp to days since epoch
+   * </ul>
+   *
+   * @param expirationConfig expiration configuration containing format 
patterns
+   * @param field the field being used for expiration
+   * @param expireTimestamp timestamp in milliseconds for expiration boundary
+   * @return comparable value appropriate for the field type
+   */
   private Comparable<?> getExpireValue(
       DataExpirationConfig expirationConfig, Types.NestedField field, long 
expireTimestamp) {
     switch (field.type().typeId()) {
@@ -750,6 +780,9 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
             .format(
                 DateTimeFormatter.ofPattern(
                     expirationConfig.getDateTimePattern(), 
Locale.getDefault()));
+
+      case DATE:
+        return calExpireDaysForDate(expireTimestamp, field);
       default:
         throw new IllegalArgumentException(
             "Unsupported expiration field type: " + field.type().typeId());
@@ -790,6 +823,9 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
                     DateTimeFormatter.ofPattern(
                         expirationConfig.getDateTimePattern(), 
Locale.getDefault()));
         return Expressions.lessThanOrEqual(field.name(), expireDateTime);
+      case DATE:
+        int expireDays = calExpireDaysForDate(expireTimestamp, field);
+        return Expressions.lessThan(field.name(), expireDays);
       default:
         return Expressions.alwaysTrue();
     }
@@ -994,6 +1030,10 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
                   .atZone(getDefaultZoneId(field))
                   .toInstant()
                   .toEpochMilli());
+    } else if (type.typeId() == Type.TypeID.DATE) {
+      if (upperBound instanceof Integer) {
+        literal = Literal.of(((Integer) upperBound).longValue() * 24 * 60 * 60 
* 1000);
+      }
     }
 
     return literal;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
index 9b469bd32..f5b239ff4 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java
@@ -190,7 +190,8 @@ public class TableConfigurations {
   }
 
   public static final Set<Type.TypeID> DATA_EXPIRATION_FIELD_TYPES =
-      Sets.newHashSet(Type.TypeID.TIMESTAMP, Type.TypeID.STRING, 
Type.TypeID.LONG);
+      Sets.newHashSet(
+          Type.TypeID.TIMESTAMP, Type.TypeID.STRING, Type.TypeID.LONG, 
Type.TypeID.DATE);
 
   private static boolean validateExpirationField(
       Types.NestedField field, String name, String expirationField) {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index 9e10d94c3..75b3e9a60 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -144,6 +144,38 @@ public class TestDataExpire extends ExecutorTestBase {
             PrimaryKeySpec.noPrimaryKey(),
             PartitionSpec.unpartitioned(),
             getDefaultProp())
+      },
+      // Mixed format partitioned by date type
+      {
+        new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(TABLE_SCHEMA3, PRIMARY_KEY_SPEC, SPEC3, 
getDefaultProp())
+      },
+      {
+        new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(TABLE_SCHEMA3, PRIMARY_KEY_SPEC, SPEC4, 
getDefaultProp())
+      },
+      {
+        new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(
+            TABLE_SCHEMA3, PRIMARY_KEY_SPEC, PartitionSpec.unpartitioned(), 
getDefaultProp())
+      },
+      {
+        new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(
+            TABLE_SCHEMA3, PrimaryKeySpec.noPrimaryKey(), SPEC3, 
getDefaultProp())
+      },
+      {
+        new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(
+            TABLE_SCHEMA3, PrimaryKeySpec.noPrimaryKey(), SPEC4, 
getDefaultProp())
+      },
+      {
+        new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG),
+        new BasicTableTestHelper(
+            TABLE_SCHEMA3,
+            PrimaryKeySpec.noPrimaryKey(),
+            PartitionSpec.unpartitioned(),
+            getDefaultProp())
       }
     };
   }
@@ -165,6 +197,19 @@ public class TestDataExpire extends ExecutorTestBase {
   public static final PartitionSpec SPEC2 =
       PartitionSpec.builderFor(TABLE_SCHEMA2).identity("op_time").build();
 
+  public static final Schema TABLE_SCHEMA3 =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "name", Types.StringType.get()),
+          Types.NestedField.required(3, "ts", Types.LongType.get()),
+          Types.NestedField.required(4, "op_time", Types.DateType.get()));
+
+  public static final PartitionSpec SPEC3 =
+      PartitionSpec.builderFor(TABLE_SCHEMA3).identity("op_time").build();
+
+  public static final PartitionSpec SPEC4 =
+      PartitionSpec.builderFor(TABLE_SCHEMA3).day("op_time").build();
+
   public TestDataExpire(CatalogTestHelper catalogTestHelper, TableTestHelper 
tableTestHelper) {
     super(catalogTestHelper, tableTestHelper);
   }
@@ -201,6 +246,10 @@ public class TestDataExpire extends ExecutorTestBase {
         expected =
             Lists.newArrayList(
                 createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), 
"2022-01-03T12:00:00"));
+      } else if (expireByDate()) {
+        expected =
+            Lists.newArrayList(
+                createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), 
"2022-01-03T12:00:00"));
       } else {
         expected =
             Lists.newArrayList(
@@ -256,6 +305,8 @@ public class TestDataExpire extends ExecutorTestBase {
     if (tableTestHelper().partitionSpec().isPartitioned()) {
       if (expireByStringDate()) {
         assertScanResult(scanAfterExpire, 1, 0);
+      } else if (expireByDate()) {
+        assertScanResult(scanAfterExpire, 1, 0);
       } else {
         assertScanResult(scanAfterExpire, 3, 0);
       }
@@ -270,6 +321,10 @@ public class TestDataExpire extends ExecutorTestBase {
         expected =
             Lists.newArrayList(
                 createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), 
"2022-01-03T12:00:00"));
+      } else if (expireByDate()) {
+        expected =
+            Lists.newArrayList(
+                createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), 
"2022-01-03T12:00:00"));
       } else {
         expected =
             Lists.newArrayList(
@@ -373,6 +428,10 @@ public class TestDataExpire extends ExecutorTestBase {
       expected =
           Lists.newArrayList(
               createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), 
"2022-01-03T12:00:00"));
+    } else if (expireByDate()) {
+      expected =
+          Lists.newArrayList(
+              createRecord(2, "222", parseMillis("2022-01-03T12:00:00"), 
"2022-01-03T12:00:00"));
     } else {
       expected =
           Lists.newArrayList(
@@ -549,6 +608,9 @@ public class TestDataExpire extends ExecutorTestBase {
                 .toInstant(ZoneOffset.UTC)
                 .toEpochMilli();
         break;
+      case DATE:
+        time = 
LocalDateTime.parse(opTime).atZone(ZoneOffset.UTC).toLocalDate();
+        break;
       default:
         time = opTime;
     }
@@ -637,4 +699,189 @@ public class TestDataExpire extends ExecutorTestBase {
     Map<String, String> properties = table.properties();
     return TableConfigurations.parseDataExpirationConfig(properties);
   }
+
+  private boolean expireByDate() {
+    String expireField =
+        CompatiblePropertyUtil.propertyAsString(
+            getMixedTable().properties(), 
TableProperties.DATA_EXPIRATION_FIELD, "");
+    Types.NestedField field = getMixedTable().schema().findField(expireField);
+    return field != null && field.type().typeId().equals(Type.TypeID.DATE);
+  }
+
+  @Test
+  public void testDateTypeBoundaryConditions() {
+    assumeTrue(expireByDate());
+
+    DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+    Types.NestedField field = 
getMixedTable().schema().findField(config.getExpirationField());
+    List<Record> records =
+        Lists.newArrayList(
+            createRecord(1, "111", parseMillis("2022-01-03T00:00:00"), 
"2022-01-03T00:00:00"),
+            createRecord(2, "222", parseMillis("2022-01-03T00:00:01"), 
"2022-01-03T00:00:01"),
+            createRecord(3, "333", parseMillis("2022-01-03T23:59:59"), 
"2022-01-03T23:59:59"),
+            createRecord(4, "444", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"));
+
+    records.forEach(
+        r ->
+            OptimizingTestHelpers.appendBase(
+                getMixedTable(),
+                tableTestHelper()
+                    .writeBaseStore(getMixedTable(), 0, Lists.newArrayList(r), 
false)));
+
+    getMaintainerAndExpire(config, "2022-01-04T12:00:00.000");
+
+    List<Record> result = readSortedBaseRecords(getMixedTable());
+
+    List<Record> expected;
+    if (getMixedTable().spec().isPartitioned()) {
+      expected =
+          Lists.newArrayList(
+              createRecord(4, "444", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"));
+
+    } else {
+      expected =
+          Lists.newArrayList(
+              createRecord(1, "111", parseMillis("2022-01-03T00:00:00"), 
"2022-01-03T00:00:00"),
+              createRecord(2, "222", parseMillis("2022-01-03T00:00:01"), 
"2022-01-03T00:00:01"),
+              createRecord(3, "333", parseMillis("2022-01-03T23:59:59"), 
"2022-01-03T23:59:59"),
+              createRecord(4, "444", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"));
+    }
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testDateTypeBoundaryConditionsFileLevel() {
+    assumeTrue(expireByDate());
+
+    getMixedTable()
+        .updateProperties()
+        .set(TableProperties.DATA_EXPIRATION_LEVEL, 
DataExpirationConfig.ExpireLevel.FILE.name())
+        .commit();
+
+    List<Record> records =
+        Lists.newArrayList(
+            createRecord(1, "111", parseMillis("2022-01-03T00:00:00"), 
"2022-01-03T00:00:00"),
+            createRecord(2, "222", parseMillis("2022-01-03T00:00:01"), 
"2022-01-03T00:00:01"),
+            createRecord(3, "333", parseMillis("2022-01-03T23:59:59"), 
"2022-01-03T23:59:59"),
+            createRecord(4, "444", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"));
+
+    records.forEach(
+        r ->
+            OptimizingTestHelpers.appendBase(
+                getMixedTable(),
+                tableTestHelper()
+                    .writeBaseStore(getMixedTable(), 0, Lists.newArrayList(r), 
false)));
+
+    DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+    getMaintainerAndExpire(config, "2022-01-04T12:00:00.000");
+
+    List<Record> result = readSortedBaseRecords(getMixedTable());
+    List<Record> expected =
+        Lists.newArrayList(
+            createRecord(4, "444", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"));
+
+    Assert.assertEquals(expected, result);
+  }
+
+  @Test
+  public void testDateTypeBoundaryConditionsPartitionLevel() {
+    assumeTrue(expireByDate() && 
tableTestHelper().partitionSpec().isPartitioned());
+
+    List<Record> records =
+        Lists.newArrayList(
+            createRecord(1, "111", parseMillis("2022-01-03T00:00:00"), 
"2022-01-03T00:00:00"),
+            createRecord(2, "222", parseMillis("2022-01-03T23:59:59"), 
"2022-01-03T23:59:59"),
+            createRecord(3, "333", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"),
+            createRecord(4, "444", parseMillis("2022-01-04T23:59:59"), 
"2022-01-04T23:59:59"));
+
+    OptimizingTestHelpers.appendBase(
+        getMixedTable(), tableTestHelper().writeBaseStore(getMixedTable(), 0, 
records, false));
+
+    DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+    getMaintainerAndExpire(config, "2022-01-05T12:00:00.000");
+
+    List<Record> result = readSortedBaseRecords(getMixedTable());
+
+    Assert.assertEquals(result.size(), 0);
+  }
+
+  @Test
+  public void testDateTypeBoundaryConditionsKeyedTable() {
+    assumeTrue(expireByDate() && isKeyedTable());
+
+    KeyedTable keyedTable = getMixedTable().asKeyedTable();
+
+    List<Record> baseRecords =
+        Lists.newArrayList(
+            createRecord(1, "111", parseMillis("2022-01-03T00:00:00"), 
"2022-01-03T00:00:00"),
+            createRecord(2, "222", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"));
+    OptimizingTestHelpers.appendBase(
+        keyedTable, tableTestHelper().writeBaseStore(keyedTable, 0, 
baseRecords, false));
+
+    List<Record> changeRecords =
+        Lists.newArrayList(
+            createRecord(3, "333", parseMillis("2022-01-03T23:59:59"), 
"2022-01-03T23:59:59"),
+            createRecord(4, "444", parseMillis("2022-01-04T00:00:01"), 
"2022-01-04T00:00:01"));
+    OptimizingTestHelpers.appendChange(
+        keyedTable,
+        tableTestHelper()
+            .writeChangeStore(keyedTable, 1L, ChangeAction.INSERT, 
changeRecords, false));
+
+    DataExpirationConfig config = parseDataExpirationConfig(keyedTable);
+    MixedTableMaintainer tableMaintainer = new 
MixedTableMaintainer(keyedTable, null);
+    tableMaintainer.expireDataFrom(
+        config,
+        LocalDateTime.parse("2022-01-04T12:00:00.000")
+            .atZone(
+                IcebergTableMaintainer.getDefaultZoneId(
+                    
keyedTable.schema().findField(config.getExpirationField())))
+            .toInstant());
+
+    List<Record> records = readSortedKeyedRecords(keyedTable);
+    List<Record> expected;
+    if (getMixedTable().spec().isPartitioned()) {
+      expected =
+          Lists.newArrayList(
+              createRecord(2, "222", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"),
+              createRecord(4, "444", parseMillis("2022-01-04T00:00:01"), 
"2022-01-04T00:00:01"));
+    } else {
+      expected =
+          Lists.newArrayList(
+              createRecord(1, "111", parseMillis("2022-01-03T00:00:00"), 
"2022-01-03T00:00:00"),
+              createRecord(2, "222", parseMillis("2022-01-04T00:00:00"), 
"2022-01-04T00:00:00"),
+              createRecord(3, "333", parseMillis("2022-01-03T23:59:59"), 
"2022-01-03T23:59:59"),
+              createRecord(4, "444", parseMillis("2022-01-04T00:00:01"), 
"2022-01-04T00:00:01"));
+    }
+
+    Assert.assertEquals(expected, records);
+  }
+
+  @Test
+  public void testDateTypeCrossDateBoundaryScenarios() {
+    assumeTrue(expireByDate());
+
+    List<Record> records =
+        Lists.newArrayList(
+            createRecord(1, "111", parseMillis("2022-01-31T23:59:59"), 
"2022-01-31T23:59:59"),
+            createRecord(2, "222", parseMillis("2022-02-01T00:00:00"), 
"2022-02-01T00:00:00"),
+            createRecord(3, "333", parseMillis("2021-12-31T23:59:59"), 
"2021-12-31T23:59:59"),
+            createRecord(4, "444", parseMillis("2022-01-01T00:00:00"), 
"2022-01-01T00:00:00"),
+            createRecord(5, "555", parseMillis("2020-02-29T12:00:00"), 
"2020-02-29T12:00:00"),
+            createRecord(6, "666", parseMillis("2020-03-01T00:00:00"), 
"2020-03-01T00:00:00"));
+
+    records.forEach(
+        r ->
+            OptimizingTestHelpers.appendBase(
+                getMixedTable(),
+                tableTestHelper()
+                    .writeBaseStore(getMixedTable(), 0, Lists.newArrayList(r), 
false)));
+
+    DataExpirationConfig config = parseDataExpirationConfig(getMixedTable());
+    getMaintainerAndExpire(config, "2022-02-02T12:00:00.000");
+
+    List<Record> result = readSortedBaseRecords(getMixedTable());
+
+    Assert.assertEquals(result.size(), 0);
+  }
 }

Reply via email to