flyrain commented on code in PR #3415:
URL: https://github.com/apache/polaris/pull/3415#discussion_r2697150327
##########
runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java:
##########
@@ -434,6 +435,272 @@ public void testTableCleanupMultipleSnapshots() throws
IOException {
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
}
+ @Test
+ public void testTableCleanupWithTaskPersistenceBatching() throws IOException
{
+ FileIO fileIO = new InMemoryFileIO();
+ TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1",
"schema1"), "table1");
+ TableCleanupTaskHandler handler = newTableCleanupTaskHandler(fileIO);
+
+ // Create 12 snapshots to generate 12 manifest cleanup tasks
+ // This tests that task persistence batching works with default batch size
of 100
+ List<Snapshot> snapshots = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ ManifestFile manifestFile =
+ TaskTestUtils.manifestFile(
+ fileIO,
+ "manifest" + i + ".avro",
+ 100L + i,
+ "dataFile" + (i * 2) + ".parquet",
+ "dataFile" + (i * 2 + 1) + ".parquet");
+ Snapshot snapshot =
+ TaskTestUtils.newSnapshot(
+ fileIO,
+ "manifestList" + i + ".avro",
+ i + 1,
+ 100L + i,
+ i > 0 ? 99L + i : 99L,
+ manifestFile);
+ snapshots.add(snapshot);
+ }
+
+ String metadataFile = "v1-batch-test.metadata.json";
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile,
snapshots.toArray(new Snapshot[0]));
+
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(
+ PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ task = addTaskLocation(task);
+
+ handler.handleTask(task, callContext);
+
+ // Verify that all tasks were created
+ List<PolarisBaseEntity> entities =
+ metaStoreManager
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(50))
+ .getEntities();
+
+ List<PolarisBaseEntity> manifestCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.MANIFEST_FILE_CLEANUP;
+ })
+ .toList();
+ List<PolarisBaseEntity> metadataCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.BATCH_FILE_CLEANUP;
+ })
+ .toList();
+
+ // Should have 12 manifest cleanup tasks (one per unique manifest)
+ assertThat(manifestCleanupTasks).hasSize(12);
+ // Should have 2 metadata cleanup tasks (12 manifest lists / batch size of
10 = ceil(1.2) = 2)
+ assertThat(metadataCleanupTasks).hasSize(2);
+ }
+
+ @Test
+ public void testManifestDeduplicationAcrossSnapshots() throws IOException {
+ FileIO fileIO = new InMemoryFileIO();
+ TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1",
"schema1"), "table1");
+ TableCleanupTaskHandler handler = newTableCleanupTaskHandler(fileIO);
+
+ // Create shared manifest files that appear in multiple snapshots
+ ManifestFile sharedManifest1 =
+ TaskTestUtils.manifestFile(
+ fileIO, "shared-manifest1.avro", 100L, "dataFile1.parquet",
"dataFile2.parquet");
+ ManifestFile sharedManifest2 =
+ TaskTestUtils.manifestFile(
+ fileIO, "shared-manifest2.avro", 100L, "dataFile3.parquet",
"dataFile4.parquet");
+ ManifestFile uniqueManifest =
+ TaskTestUtils.manifestFile(fileIO, "unique-manifest.avro", 101L,
"dataFile5.parquet");
+
+ // Create multiple snapshots that share manifests
+ Snapshot snapshot1 =
+ TaskTestUtils.newSnapshot(
+ fileIO, "manifestList1.avro", 1, 100L, 99L, sharedManifest1,
sharedManifest2);
+ Snapshot snapshot2 =
+ TaskTestUtils.newSnapshot(
+ fileIO,
+ "manifestList2.avro",
+ 2,
+ 101L,
+ 100L,
+ sharedManifest1,
+ sharedManifest2,
+ uniqueManifest);
+ Snapshot snapshot3 =
+ TaskTestUtils.newSnapshot(
+ fileIO, "manifestList3.avro", 3, 102L, 101L, sharedManifest2,
uniqueManifest);
+
+ String metadataFile = "v1-dedup-test.metadata.json";
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot1,
snapshot2, snapshot3);
+
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(
+ PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ task = addTaskLocation(task);
+
+ handler.handleTask(task, callContext);
+
+ List<PolarisBaseEntity> entities =
+ metaStoreManager
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(10))
+ .getEntities();
+
+ List<PolarisBaseEntity> manifestCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.MANIFEST_FILE_CLEANUP;
+ })
+ .toList();
+
+ // Should have exactly 3 manifest cleanup tasks (deduplicated):
+ // sharedManifest1, sharedManifest2, and uniqueManifest
+ assertThat(manifestCleanupTasks)
+ .hasSize(3)
+ .satisfiesExactlyInAnyOrder(
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, sharedManifest1),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, sharedManifest2),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, uniqueManifest),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
+ }
+
+ @Test
+ public void testMetadataFileBatchingWithManyFiles() throws IOException {
Review Comment:
The intent of this unit test is not to simulate a truly large table, but to
validate the batching behavior and correctness when metadata files are
processed incrementally. As is common practice, we avoid stress or scale tests
in unit tests, since they would significantly slow down CI execution and are
better suited for dedicated benchmark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]