pingtimeout commented on code in PR #3415:
URL: https://github.com/apache/polaris/pull/3415#discussion_r2689564694
##########
runtime/service/src/test/java/org/apache/polaris/service/task/TableCleanupTaskHandlerTest.java:
##########
@@ -434,6 +435,272 @@ public void testTableCleanupMultipleSnapshots() throws
IOException {
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
}
+ @Test
+ public void testTableCleanupWithTaskPersistenceBatching() throws IOException
{
+ FileIO fileIO = new InMemoryFileIO();
+ TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1",
"schema1"), "table1");
+ TableCleanupTaskHandler handler = newTableCleanupTaskHandler(fileIO);
+
+ // Create 12 snapshots to generate 12 manifest cleanup tasks
+ // This tests that task persistence batching works with default batch size
of 100
+ List<Snapshot> snapshots = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ ManifestFile manifestFile =
+ TaskTestUtils.manifestFile(
+ fileIO,
+ "manifest" + i + ".avro",
+ 100L + i,
+ "dataFile" + (i * 2) + ".parquet",
+ "dataFile" + (i * 2 + 1) + ".parquet");
+ Snapshot snapshot =
+ TaskTestUtils.newSnapshot(
+ fileIO,
+ "manifestList" + i + ".avro",
+ i + 1,
+ 100L + i,
+ i > 0 ? 99L + i : 99L,
+ manifestFile);
+ snapshots.add(snapshot);
+ }
+
+ String metadataFile = "v1-batch-test.metadata.json";
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile,
snapshots.toArray(new Snapshot[0]));
+
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(
+ PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ task = addTaskLocation(task);
+
+ handler.handleTask(task, callContext);
+
+ // Verify that all tasks were created
+ List<PolarisBaseEntity> entities =
+ metaStoreManager
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(50))
+ .getEntities();
+
+ List<PolarisBaseEntity> manifestCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.MANIFEST_FILE_CLEANUP;
+ })
+ .toList();
+ List<PolarisBaseEntity> metadataCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.BATCH_FILE_CLEANUP;
+ })
+ .toList();
+
+ // Should have 12 manifest cleanup tasks (one per unique manifest)
+ assertThat(manifestCleanupTasks).hasSize(12);
+ // Should have 2 metadata cleanup tasks (12 manifest lists / batch size of
10 = ceil(1.2) = 2)
+ assertThat(metadataCleanupTasks).hasSize(2);
+ }
+
+ @Test
+ public void testManifestDeduplicationAcrossSnapshots() throws IOException {
+ FileIO fileIO = new InMemoryFileIO();
+ TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of("db1",
"schema1"), "table1");
+ TableCleanupTaskHandler handler = newTableCleanupTaskHandler(fileIO);
+
+ // Create shared manifest files that appear in multiple snapshots
+ ManifestFile sharedManifest1 =
+ TaskTestUtils.manifestFile(
+ fileIO, "shared-manifest1.avro", 100L, "dataFile1.parquet",
"dataFile2.parquet");
+ ManifestFile sharedManifest2 =
+ TaskTestUtils.manifestFile(
+ fileIO, "shared-manifest2.avro", 100L, "dataFile3.parquet",
"dataFile4.parquet");
+ ManifestFile uniqueManifest =
+ TaskTestUtils.manifestFile(fileIO, "unique-manifest.avro", 101L,
"dataFile5.parquet");
+
+ // Create multiple snapshots that share manifests
+ Snapshot snapshot1 =
+ TaskTestUtils.newSnapshot(
+ fileIO, "manifestList1.avro", 1, 100L, 99L, sharedManifest1,
sharedManifest2);
+ Snapshot snapshot2 =
+ TaskTestUtils.newSnapshot(
+ fileIO,
+ "manifestList2.avro",
+ 2,
+ 101L,
+ 100L,
+ sharedManifest1,
+ sharedManifest2,
+ uniqueManifest);
+ Snapshot snapshot3 =
+ TaskTestUtils.newSnapshot(
+ fileIO, "manifestList3.avro", 3, 102L, 101L, sharedManifest2,
uniqueManifest);
+
+ String metadataFile = "v1-dedup-test.metadata.json";
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile, snapshot1,
snapshot2, snapshot3);
+
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(
+ PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ task = addTaskLocation(task);
+
+ handler.handleTask(task, callContext);
+
+ List<PolarisBaseEntity> entities =
+ metaStoreManager
+ .loadTasks(callContext.getPolarisCallContext(), "test",
PageToken.fromLimit(10))
+ .getEntities();
+
+ List<PolarisBaseEntity> manifestCleanupTasks =
+ entities.stream()
+ .filter(
+ entity -> {
+ AsyncTaskType taskType = TaskEntity.of(entity).getTaskType();
+ return taskType == AsyncTaskType.MANIFEST_FILE_CLEANUP;
+ })
+ .toList();
+
+ // Should have exactly 3 manifest cleanup tasks (deduplicated):
+ // sharedManifest1, sharedManifest2, and uniqueManifest
+ assertThat(manifestCleanupTasks)
+ .hasSize(3)
+ .satisfiesExactlyInAnyOrder(
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, sharedManifest1),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, sharedManifest2),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
+ taskEntity ->
+ assertThat(taskEntity)
+ .extracting(TaskEntity::of)
+ .returns(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
+ tableIdentifier, uniqueManifest),
+ entity ->
+ entity.readData(
+
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
+ }
+
+ @Test
+ public void testMetadataFileBatchingWithManyFiles() throws IOException {
Review Comment:
This test is named `testMetadataFileBatchingWithManyFiles` but only creates
24 files in total. Unfortunately that does not prove that the code is better
at handling large tables.
##########
runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -209,7 +263,28 @@ private Stream<TaskEntity> getMetadataTaskStream(
CallContext callContext) {
PolarisCallContext polarisCallContext =
callContext.getPolarisCallContext();
int batchSize =
callContext.getRealmConfig().getConfig(BATCH_SIZE_CONFIG_KEY, 10);
- return getMetadataFileBatches(tableMetadata, batchSize).stream()
+
+ // Stream all metadata files without materializing them all at once
Review Comment:
The only thing that this change does it to postpone the call to the
`.map(...)` methods, but afaict the memory consumption stays identical.
##########
runtime/service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -160,13 +213,14 @@ private Stream<TaskEntity> getManifestTaskStream(
// read the manifest list for each snapshot. dedupe the manifest files and
schedule a
// cleanupTask
// for each manifest file and its data files to be deleted
+ // Use a Set to track seen paths for deduplication without materializing
all ManifestFile
+ // objects
+ Set<String> seenPaths = new HashSet<>();
return tableMetadata.snapshots().stream()
.flatMap(sn -> sn.allManifests(fileIO).stream())
- // distinct by manifest path, since multiple snapshots will contain
the same
- // manifest
- .collect(Collectors.toMap(ManifestFile::path, Function.identity(),
(mf1, mf2) -> mf1))
- .values()
- .stream()
+ // distinct by manifest path, since multiple snapshots will contain
the same manifest
+ // Use stateful filter to dedupe while streaming
+ .filter(mf -> seenPaths.add(mf.path()))
Review Comment:
This line adds all unique manifest files across all snapshots to a set that
is maintained in memory. Even though the stream is lazy, all unique manifest
paths are materialized on the heap. This means that the space complexity does
not change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]