rdsr commented on a change in pull request #350: Add dropTable purge option to 
Catalog API
URL: https://github.com/apache/incubator-iceberg/pull/350#discussion_r311761154
 
 

 ##########
 File path: core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
 ##########
 @@ -119,4 +133,81 @@ private Table loadMetadataTable(TableIdentifier 
identifier, TableType type) {
   protected abstract TableOperations newTableOps(TableIdentifier 
tableIdentifier);
 
   protected abstract String defaultWarehouseLocation(TableIdentifier 
tableIdentifier);
+
+  /**
+   * Drops all data and metadata files referenced by TableMetadata.
+   * <p>
+   * This should be called by dropTable implementations to clean up table 
files once the table has been dropped in the
+   * metastore.
+   *
+   * @param io a FileIO to use for deletes
+   * @param metadata the last valid TableMetadata instance for a dropped table.
+   */
+  protected static void dropTableData(FileIO io, TableMetadata metadata) {
+    // Reads and deletes are done using 
Tasks.foreach(...).suppressFailureWhenFinished to complete
+    // as much of the delete work as possible and avoid orphaned data or 
manifest files.
+
+    Set<String> manifestListsToDelete = Sets.newHashSet();
+    Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
+    for (Snapshot snapshot : metadata.snapshots()) {
+      manifestsToDelete.addAll(snapshot.manifests());
+      // add the manifest list to the delete set, if present
+      if (snapshot.manifestListLocation() != null) {
+        manifestListsToDelete.add(snapshot.manifestListLocation());
+      }
+    }
+
+    LOG.info("Manifests to delete: {}", Joiner.on(", 
").join(manifestsToDelete));
+
+    // run all of the deletes
+
+    deleteFiles(io, manifestsToDelete);
+
+    Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
+        .noRetry().suppressFailureWhenFinished()
+        .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest: 
{}", manifest, exc))
+        .run(io::deleteFile);
+
+    Tasks.foreach(manifestListsToDelete)
+        .noRetry().suppressFailureWhenFinished()
+        .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list: 
{}", list, exc))
+        .run(io::deleteFile);
+
+    Tasks.foreach(metadata.file().location())
+        .noRetry().suppressFailureWhenFinished()
+        .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file: 
{}", list, exc))
+        .run(io::deleteFile);
+  }
+
+  private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
+    // keep track of deleted files in a map that can be cleaned up when memory 
runs low
+    Map<String, Boolean> deletedFiles = new MapMaker()
+        .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE)
+        .weakKeys()
+        .makeMap();
+
+    Tasks.foreach(allManifests)
+        .noRetry().suppressFailureWhenFinished()
+        .executeWith(ThreadPools.getWorkerPool())
+        .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this 
may cause orphaned data files", exc))
+        .run(manifest -> {
+          try (ManifestReader reader = 
ManifestReader.read(io.newInputFile(manifest.path()))) {
+            for (ManifestEntry entry : reader.entries()) {
 
 Review comment:
   `reader.entries()` can give back deleted entries right? What happens in that 
case, would it needlessly cause us to print warning exceptions ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to