flyrain commented on code in PR #312:
URL: https://github.com/apache/polaris/pull/312#discussion_r1837541021
##########
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -154,15 +190,32 @@ public boolean handleTask(TaskEntity cleanupTask) {
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("metadataLocation", tableEntity.getMetadataLocation())
.addKeyValue("taskCount", taskEntities.size())
- .log("Successfully queued tasks to delete manifests - deleting
table metadata file");
+ .log(
+ "Successfully queued tasks to delete manifests, previous
metadata, and statistics files - deleting table metadata file");
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(),
CallContext.getCurrentContext());
}
+
fileIO.deleteFile(tableEntity.getMetadataLocation());
return true;
}
}
return false;
}
+
+ private List<List<String>> getContentFileBatch(TableMetadata tableMetadata,
FileIO fileIO) {
Review Comment:
The name `contentFile` in Iceberg refers to a data file or a delete file.
Let's pick another name to avoid confusion, as metadata files and statistic
files are not content files. How about `metadataFile`?
##########
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -154,15 +190,32 @@ public boolean handleTask(TaskEntity cleanupTask) {
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("metadataLocation", tableEntity.getMetadataLocation())
.addKeyValue("taskCount", taskEntities.size())
- .log("Successfully queued tasks to delete manifests - deleting
table metadata file");
+ .log(
+ "Successfully queued tasks to delete manifests, previous
metadata, and statistics files - deleting table metadata file");
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(),
CallContext.getCurrentContext());
}
+
fileIO.deleteFile(tableEntity.getMetadataLocation());
return true;
}
}
return false;
}
+
+ private List<List<String>> getContentFileBatch(TableMetadata tableMetadata,
FileIO fileIO) {
+ List<List<String>> result = new ArrayList<>();
+ List<String> contentFiles =
+ Stream.concat(
+
tableMetadata.previousFiles().stream().map(TableMetadata.MetadataLogEntry::file),
+
tableMetadata.statisticsFiles().stream().map(StatisticsFile::path))
+ .filter(file -> TaskUtils.exists(file, fileIO))
Review Comment:
Do we need to check the file existence here? It will be checked at delete
time, right?
##########
polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java:
##########
@@ -68,58 +70,107 @@ public boolean canHandleTask(TaskEntity task) {
@Override
public boolean handleTask(TaskEntity task) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
- ManifestFile manifestFile =
decodeManifestData(cleanupTask.getManifestFileData());
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {
-
- // if the file doesn't exist, we assume that another task execution was
successful, but failed
- // to drop the task entity. Log a warning and return success
- if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) {
+ if (cleanupTask.getManifestFileData() != null) {
+ ManifestFile manifestFile =
decodeManifestData(cleanupTask.getManifestFileData());
+ return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
+ } else if (cleanupTask.getContentFileBatch() != null) {
+ return cleanUpContentFiles(cleanupTask.getContentFileBatch(),
authorizedFileIO, tableId);
+ } else {
LOGGER
.atWarn()
- .addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
- .log("Manifest cleanup task scheduled, but manifest file doesn't
exist");
+ .log("Cleanup task scheduled, but input file doesn't exist");
return true;
}
+ }
+ }
- ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
authorizedFileIO);
- List<CompletableFuture<Void>> dataFileDeletes =
- StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
- false)
- .map(
- file ->
- tryDelete(
- tableId, authorizedFileIO, manifestFile,
file.path().toString(), null, 1))
- .toList();
- LOGGER.debug(
- "Scheduled {} data files to be deleted from manifest {}",
- dataFileDeletes.size(),
- manifestFile.path());
- try {
- // wait for all data files to be deleted, then wait for the manifest
itself to be deleted
-
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
- .thenCompose(
- (v) -> {
- LOGGER
- .atInfo()
- .addKeyValue("manifestFile", manifestFile.path())
- .log("All data files in manifest deleted - deleting
manifest");
- return tryDelete(
- tableId, authorizedFileIO, manifestFile,
manifestFile.path(), null, 1);
- })
- .get();
- return true;
- } catch (InterruptedException e) {
- LOGGER.error(
- "Interrupted exception deleting data files from manifest {}",
manifestFile.path(), e);
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- LOGGER.error("Unable to delete data files from manifest {}",
manifestFile.path(), e);
- return false;
- }
+ private boolean cleanUpManifestFile(
+ ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) {
+ // if the file doesn't exist, we assume that another task execution was
successful, but
+ // failed to drop the task entity. Log a warning and return success
+ if (!TaskUtils.exists(manifestFile.path(), fileIO)) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("manifestFile", manifestFile.path())
+ .addKeyValue("tableId", tableId)
+ .log("Manifest cleanup task scheduled, but manifest file doesn't
exist");
+ return true;
+ }
+
+ ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
fileIO);
+ List<CompletableFuture<Void>> dataFileDeletes =
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
+ false)
+ .map(file -> tryDelete(tableId, fileIO, manifestFile,
file.path().toString(), null, 1))
+ .toList();
+ LOGGER.debug(
+ "Scheduled {} data files to be deleted from manifest {}",
+ dataFileDeletes.size(),
+ manifestFile.path());
+ try {
+ // wait for all data files to be deleted, then wait for the manifest
itself to be deleted
+
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
+ .thenCompose(
+ (v) -> {
+ LOGGER
+ .atInfo()
+ .addKeyValue("manifestFile", manifestFile.path())
+ .log("All data files in manifest deleted - deleting
manifest");
+ return tryDelete(tableId, fileIO, manifestFile,
manifestFile.path(), null, 1);
+ })
+ .get();
+ return true;
+ } catch (InterruptedException e) {
+ LOGGER.error(
+ "Interrupted exception deleting data files from manifest {}",
manifestFile.path(), e);
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ LOGGER.error("Unable to delete data files from manifest {}",
manifestFile.path(), e);
+ return false;
+ }
+ }
+
+ private boolean cleanUpContentFiles(
+ List<String> contentFileBatch, FileIO fileIO, TableIdentifier tableId) {
+ List<String> validFiles =
+ contentFileBatch.stream().filter(file -> TaskUtils.exists(file,
fileIO)).toList();
+ if (validFiles.isEmpty()) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("contentFileBatch", contentFileBatch.toString())
+ .addKeyValue("tableId", tableId)
+ .log("Table content cleanup task scheduled, but the none of the file
in batch exists");
+ return true;
}
+
+ // Schedule the deletion for each file asynchronously
+ List<CompletableFuture<Void>> deleteFutures =
+ validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file,
null, 1)).toList();
+
+ // Wait for all delete operations to finish
+ try {
+ CompletableFuture<Void> allDeletes =
+ CompletableFuture.allOf(deleteFutures.toArray(new
CompletableFuture[0]));
+ allDeletes.join();
+ } catch (Exception e) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("contentFileBatch", contentFileBatch.toString())
+ .addKeyValue("tableId", tableId)
+ .log("Exception detected during content file batch deletion", e);
Review Comment:
Should we throw here?
##########
polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java:
##########
@@ -68,58 +70,107 @@ public boolean canHandleTask(TaskEntity task) {
@Override
public boolean handleTask(TaskEntity task) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
- ManifestFile manifestFile =
decodeManifestData(cleanupTask.getManifestFileData());
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {
-
- // if the file doesn't exist, we assume that another task execution was
successful, but failed
- // to drop the task entity. Log a warning and return success
- if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) {
+ if (cleanupTask.getManifestFileData() != null) {
+ ManifestFile manifestFile =
decodeManifestData(cleanupTask.getManifestFileData());
+ return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
+ } else if (cleanupTask.getContentFileBatch() != null) {
+ return cleanUpContentFiles(cleanupTask.getContentFileBatch(),
authorizedFileIO, tableId);
+ } else {
LOGGER
.atWarn()
- .addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
- .log("Manifest cleanup task scheduled, but manifest file doesn't
exist");
+ .log("Cleanup task scheduled, but input file doesn't exist");
return true;
}
+ }
+ }
- ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
authorizedFileIO);
- List<CompletableFuture<Void>> dataFileDeletes =
- StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
- false)
- .map(
- file ->
- tryDelete(
- tableId, authorizedFileIO, manifestFile,
file.path().toString(), null, 1))
- .toList();
- LOGGER.debug(
- "Scheduled {} data files to be deleted from manifest {}",
- dataFileDeletes.size(),
- manifestFile.path());
- try {
- // wait for all data files to be deleted, then wait for the manifest
itself to be deleted
-
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
- .thenCompose(
- (v) -> {
- LOGGER
- .atInfo()
- .addKeyValue("manifestFile", manifestFile.path())
- .log("All data files in manifest deleted - deleting
manifest");
- return tryDelete(
- tableId, authorizedFileIO, manifestFile,
manifestFile.path(), null, 1);
- })
- .get();
- return true;
- } catch (InterruptedException e) {
- LOGGER.error(
- "Interrupted exception deleting data files from manifest {}",
manifestFile.path(), e);
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- LOGGER.error("Unable to delete data files from manifest {}",
manifestFile.path(), e);
- return false;
- }
+ private boolean cleanUpManifestFile(
+ ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) {
+ // if the file doesn't exist, we assume that another task execution was
successful, but
+ // failed to drop the task entity. Log a warning and return success
+ if (!TaskUtils.exists(manifestFile.path(), fileIO)) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("manifestFile", manifestFile.path())
+ .addKeyValue("tableId", tableId)
+ .log("Manifest cleanup task scheduled, but manifest file doesn't
exist");
+ return true;
+ }
+
+ ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
fileIO);
+ List<CompletableFuture<Void>> dataFileDeletes =
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
+ false)
+ .map(file -> tryDelete(tableId, fileIO, manifestFile,
file.path().toString(), null, 1))
+ .toList();
+ LOGGER.debug(
+ "Scheduled {} data files to be deleted from manifest {}",
+ dataFileDeletes.size(),
+ manifestFile.path());
+ try {
+ // wait for all data files to be deleted, then wait for the manifest
itself to be deleted
+
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
+ .thenCompose(
+ (v) -> {
+ LOGGER
+ .atInfo()
+ .addKeyValue("manifestFile", manifestFile.path())
+ .log("All data files in manifest deleted - deleting
manifest");
+ return tryDelete(tableId, fileIO, manifestFile,
manifestFile.path(), null, 1);
+ })
+ .get();
+ return true;
+ } catch (InterruptedException e) {
+ LOGGER.error(
+ "Interrupted exception deleting data files from manifest {}",
manifestFile.path(), e);
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ LOGGER.error("Unable to delete data files from manifest {}",
manifestFile.path(), e);
+ return false;
+ }
+ }
+
+ private boolean cleanUpContentFiles(
+ List<String> contentFileBatch, FileIO fileIO, TableIdentifier tableId) {
+ List<String> validFiles =
+ contentFileBatch.stream().filter(file -> TaskUtils.exists(file,
fileIO)).toList();
+ if (validFiles.isEmpty()) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("contentFileBatch", contentFileBatch.toString())
+ .addKeyValue("tableId", tableId)
+ .log("Table content cleanup task scheduled, but the none of the file
in batch exists");
+ return true;
}
+
+ // Schedule the deletion for each file asynchronously
+ List<CompletableFuture<Void>> deleteFutures =
+ validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file,
null, 1)).toList();
+
+ // Wait for all delete operations to finish
+ try {
+ CompletableFuture<Void> allDeletes =
+ CompletableFuture.allOf(deleteFutures.toArray(new
CompletableFuture[0]));
+ allDeletes.join();
+ } catch (Exception e) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("contentFileBatch", contentFileBatch.toString())
+ .addKeyValue("tableId", tableId)
+ .log("Exception detected during content file batch deletion", e);
+ }
+
+ LOGGER
+ .atInfo()
+ .addKeyValue("contentFileBatch", contentFileBatch.toString())
+ .addKeyValue("tableId", tableId)
+ .log("Content file batch deletion has completed");
Review Comment:
We can either remove it or make it debug level. An info level log may not be
necessary.
##########
polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java:
##########
@@ -68,58 +70,107 @@ public boolean canHandleTask(TaskEntity task) {
@Override
public boolean handleTask(TaskEntity task) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
- ManifestFile manifestFile =
decodeManifestData(cleanupTask.getManifestFileData());
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {
-
- // if the file doesn't exist, we assume that another task execution was
successful, but failed
- // to drop the task entity. Log a warning and return success
- if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) {
+ if (cleanupTask.getManifestFileData() != null) {
+ ManifestFile manifestFile =
decodeManifestData(cleanupTask.getManifestFileData());
+ return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
+ } else if (cleanupTask.getContentFileBatch() != null) {
+ return cleanUpContentFiles(cleanupTask.getContentFileBatch(),
authorizedFileIO, tableId);
+ } else {
LOGGER
.atWarn()
- .addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
- .log("Manifest cleanup task scheduled, but manifest file doesn't
exist");
+ .log("Cleanup task scheduled, but input file doesn't exist");
return true;
}
+ }
+ }
- ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
authorizedFileIO);
- List<CompletableFuture<Void>> dataFileDeletes =
- StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
- false)
- .map(
- file ->
- tryDelete(
- tableId, authorizedFileIO, manifestFile,
file.path().toString(), null, 1))
- .toList();
- LOGGER.debug(
- "Scheduled {} data files to be deleted from manifest {}",
- dataFileDeletes.size(),
- manifestFile.path());
- try {
- // wait for all data files to be deleted, then wait for the manifest
itself to be deleted
-
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
- .thenCompose(
- (v) -> {
- LOGGER
- .atInfo()
- .addKeyValue("manifestFile", manifestFile.path())
- .log("All data files in manifest deleted - deleting
manifest");
- return tryDelete(
- tableId, authorizedFileIO, manifestFile,
manifestFile.path(), null, 1);
- })
- .get();
- return true;
- } catch (InterruptedException e) {
- LOGGER.error(
- "Interrupted exception deleting data files from manifest {}",
manifestFile.path(), e);
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- LOGGER.error("Unable to delete data files from manifest {}",
manifestFile.path(), e);
- return false;
- }
+ private boolean cleanUpManifestFile(
+ ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) {
+ // if the file doesn't exist, we assume that another task execution was
successful, but
+ // failed to drop the task entity. Log a warning and return success
+ if (!TaskUtils.exists(manifestFile.path(), fileIO)) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("manifestFile", manifestFile.path())
+ .addKeyValue("tableId", tableId)
+ .log("Manifest cleanup task scheduled, but manifest file doesn't
exist");
+ return true;
+ }
+
+ ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
fileIO);
+ List<CompletableFuture<Void>> dataFileDeletes =
+ StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
+ false)
+ .map(file -> tryDelete(tableId, fileIO, manifestFile,
file.path().toString(), null, 1))
+ .toList();
+ LOGGER.debug(
+ "Scheduled {} data files to be deleted from manifest {}",
+ dataFileDeletes.size(),
+ manifestFile.path());
+ try {
+ // wait for all data files to be deleted, then wait for the manifest
itself to be deleted
+
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
+ .thenCompose(
+ (v) -> {
+ LOGGER
+ .atInfo()
+ .addKeyValue("manifestFile", manifestFile.path())
+ .log("All data files in manifest deleted - deleting
manifest");
+ return tryDelete(tableId, fileIO, manifestFile,
manifestFile.path(), null, 1);
+ })
+ .get();
+ return true;
+ } catch (InterruptedException e) {
+ LOGGER.error(
+ "Interrupted exception deleting data files from manifest {}",
manifestFile.path(), e);
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ LOGGER.error("Unable to delete data files from manifest {}",
manifestFile.path(), e);
+ return false;
+ }
+ }
+
+ private boolean cleanUpContentFiles(
+ List<String> contentFileBatch, FileIO fileIO, TableIdentifier tableId) {
+ List<String> validFiles =
+ contentFileBatch.stream().filter(file -> TaskUtils.exists(file,
fileIO)).toList();
+ if (validFiles.isEmpty()) {
+ LOGGER
+ .atWarn()
+ .addKeyValue("contentFileBatch", contentFileBatch.toString())
+ .addKeyValue("tableId", tableId)
+ .log("Table content cleanup task scheduled, but the none of the file
in batch exists");
+ return true;
Review Comment:
Do we need to warn here if 1 out of 10 files doesn't exist?
##########
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -154,15 +190,32 @@ public boolean handleTask(TaskEntity cleanupTask) {
.addKeyValue("tableIdentifier", tableEntity.getTableIdentifier())
.addKeyValue("metadataLocation", tableEntity.getMetadataLocation())
.addKeyValue("taskCount", taskEntities.size())
- .log("Successfully queued tasks to delete manifests - deleting
table metadata file");
+ .log(
+ "Successfully queued tasks to delete manifests, previous
metadata, and statistics files - deleting table metadata file");
for (PolarisBaseEntity createdTask : createdTasks) {
taskExecutor.addTaskHandlerContext(createdTask.getId(),
CallContext.getCurrentContext());
}
+
fileIO.deleteFile(tableEntity.getMetadataLocation());
return true;
}
}
return false;
}
+
+ private List<List<String>> getContentFileBatch(TableMetadata tableMetadata,
FileIO fileIO) {
+ List<List<String>> result = new ArrayList<>();
+ List<String> contentFiles =
Review Comment:
rename here
##########
polaris-service/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java:
##########
@@ -185,12 +236,18 @@ private CompletableFuture<Void> tryDelete(
public static final class ManifestCleanupTask {
private TableIdentifier tableId;
private String manifestFileData;
+ private List<String> contentFileBatch;
Review Comment:
rename here
##########
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -142,8 +146,40 @@ public boolean handleTask(TaskEntity cleanupTask) {
// copy the internal properties, which will have
storage info
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
.build();
- })
- .toList();
+ });
+
+ Stream<TaskEntity> contentFileCleanupTasks =
Review Comment:
-> `metadataFileCleanupTask`
##########
polaris-service/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -102,7 +106,7 @@ public boolean handleTask(TaskEntity cleanupTask) {
// read the manifest list for each snapshot. dedupe the manifest files
and schedule a
// cleanupTask
// for each manifest file and its data files to be deleted
- List<TaskEntity> taskEntities =
Review Comment:
This method is quite long. Can we split it to multiple methods like this?
```
var manifestFileCleanupTask = getTaskStream(...);
var metadataFileCleanupTasks = getTaskEntityStream(cleanupTask,
tableMetadata, fileIO, tableEntity, metaStoreManager, polarisCallContext);
var taskEntities = Stream.concat(manifestCleanupTasks,
contentFileCleanupTasks).toList();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]