pingtimeout commented on code in PR #3415:
URL: https://github.com/apache/polaris/pull/3415#discussion_r2704149700
##########
runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java:
##########
@@ -434,6 +435,272 @@ public void testTableCleanupMultipleSnapshots() throws
IOException {
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
}
+ @Test
+ public void testTableCleanupWithTaskPersistenceBatching() throws IOException
{
+ FileIO fileIO = new InMemoryFileIO();
+ TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1",
"schema1"), "table1");
+ TableCleanupTaskHandler handler = newTableCleanupTaskHandler(fileIO);
+
+ // Create 12 snapshots to generate 12 manifest cleanup tasks
+ // This tests that task persistence batching works with default batch size
of 100
+ List<Snapshot> snapshots = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ ManifestFile manifestFile =
+ TaskTestUtils.manifestFile(
+ fileIO,
+ "manifest" + i + ".avro",
+ 100L + i,
+ "dataFile" + (i * 2) + ".parquet",
+ "dataFile" + (i * 2 + 1) + ".parquet");
+ Snapshot snapshot =
+ TaskTestUtils.newSnapshot(
+ fileIO,
+ "manifestList" + i + ".avro",
+ i + 1,
+ 100L + i,
+ i > 0 ? 99L + i : 99L,
+ manifestFile);
+ snapshots.add(snapshot);
+ }
+
+ String metadataFile = "v1-batch-test.metadata.json";
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile,
snapshots.toArray(new Snapshot[0]));
+
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(
+ PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ task = addTaskLocation(task);
+
+ handler.handleTask(task, callContext);
+
+ // Verify that all tasks were created
+ List<PolarisBaseEntity> entities =
+ metaStoreManager
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(50))
+ .getEntities();
+
+ List<PolarisBaseEntity> manifestCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.MANIFEST_FILE_CLEANUP;
+ })
+ .toList();
+ List<PolarisBaseEntity> metadataCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.BATCH_FILE_CLEANUP;
+ })
+ .toList();
+
+ // Should have 12 manifest cleanup tasks (one per unique manifest)
+ assertThat(manifestCleanupTasks).hasSize(12);
+ // Should have 2 metadata cleanup tasks (12 manifest lists / batch size of
10 = ceil(1.2) = 2)
+ assertThat(metadataCleanupTasks).hasSize(2);
+ }
+
+ @Test
+ public void testManifestDeduplicationAcrossSnapshots() throws IOException {
+ FileIO fileIO = new InMemoryFileIO();
+ TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1",
"schema1"), "table1");
+ TableCleanupTaskHandler handler = newTableCleanupTaskHandler(fileIO);
+
+ // Create shared manifest files that appear in multiple snapshots
+ ManifestFile sharedManifest1 =
+ TaskTestUtils.manifestFile(
+ fileIO, "shared-manifest1.avro", 100L, "dataFile1.parquet",
"dataFile2.parquet");
+ ManifestFile sharedManifest2 =
+ TaskTestUtils.manifestFile(
+ fileIO, "shared-manifest2.avro", 100L, "dataFile3.parquet",
"dataFile4.parquet");
+ ManifestFile uniqueManifest =
+ TaskTestUtils.manifestFile(fileIO, "unique-manifest.avro", 101L,
"dataFile5.parquet");
+
+ // Create multiple snapshots that share manifests
+ Snapshot snapshot1 =
+ TaskTestUtils.newSnapshot(
+ fileIO, "manifestList1.avro", 1, 100L, 99L, sharedManifest1,
sharedManifest2);
+ Snapshot snapshot2 =
+ TaskTestUtils.newSnapshot(
+ fileIO,
+ "manifestList2.avro",
+ 2,
+ 101L,
+ 100L,
+ sharedManifest1,
+ sharedManifest2,
+ uniqueManifest);
+ Snapshot snapshot3 =
+ TaskTestUtils.newSnapshot(
+ fileIO, "manifestList3.avro", 3, 102L, 101L, sharedManifest2,
uniqueManifest);
+
+ String metadataFile = "v1-dedup-test.metadata.json";
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot1,
snapshot2, snapshot3);
+
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(
+ PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ task = addTaskLocation(task);
+
+ handler.handleTask(task, callContext);
+
+ List<PolarisBaseEntity> entities =
+ metaStoreManager
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(10))
+ .getEntities();
+
+ List<PolarisBaseEntity> manifestCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.MANIFEST_FILE_CLEANUP;
+ })
+ .toList();
+
+ // Should have exactly 3 manifest cleanup tasks (deduplicated):
+ // sharedManifest1, sharedManifest2, and uniqueManifest
+ assertThat(manifestCleanupTasks)
+ .hasSize(3)
+ .satisfiesExactlyInAnyOrder(
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, sharedManifest1),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, sharedManifest2),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, uniqueManifest),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
+ }
+
+ @Test
+ public void testMetadataFileBatchingWithManyFiles() throws IOException {
Review Comment:
I understand that unit tests should be quick to avoid slowing down CI. My
main concern here is whether this code change has been tested at scale. And if
so, how?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]