rdblue commented on a change in pull request #350: Add dropTableAndData to
Catalog API
URL: https://github.com/apache/incubator-iceberg/pull/350#discussion_r310776282
##########
File path: core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
##########
@@ -92,6 +106,91 @@ public Table loadTable(TableIdentifier identifier) {
return new BaseTable(ops, identifier.toString());
}
+ @Override
+ public boolean dropTableAndData(TableIdentifier identifier) {
+ // load the table state to get the data files to delete
+ TableOperations ops = newTableOps(identifier);
+ if (ops.current() == null) {
+ return false;
+ }
+
+ // drop the table
+ if (!dropTable(identifier)) {
+ // the table was dropped by another process after it was loaded, do
nothing
+ return false;
+ }
+
+ // Reads and deletes are done using
Tasks.foreach(...).suppressFailureWhenFinished to complete
+ // as much of the delete work as possible and avoid orphaned data or
manifest files.
+
+ Set<String> manifestListsToDelete = Sets.newHashSet();
+ Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
+ for (Snapshot snapshot : ops.current().snapshots()) {
+ manifestsToDelete.addAll(snapshot.manifests());
+ // add the manifest list to the delete set, if present
+ if (snapshot.manifestListLocation() != null) {
+ manifestListsToDelete.add(snapshot.manifestListLocation());
+ }
+ }
+
+ LOG.info("Manifests to delete: {}", Joiner.on(",
").join(manifestsToDelete));
+
+ // run all of the deletes
+
+ FileIO io = ops.io();
+ deleteFiles(io, manifestsToDelete);
+
+ Tasks.foreach(Iterables.transform(manifestsToDelete, ManifestFile::path))
+ .noRetry().suppressFailureWhenFinished()
+ .onFailure((manifest, exc) -> LOG.warn("Delete failed for manifest:
{}", manifest, exc))
+ .run(io::deleteFile);
+
+ Tasks.foreach(manifestListsToDelete)
+ .noRetry().suppressFailureWhenFinished()
+ .onFailure((list, exc) -> LOG.warn("Delete failed for manifest list:
{}", list, exc))
+ .run(io::deleteFile);
+
+ Tasks.foreach(ops.current().file().location())
+ .noRetry().suppressFailureWhenFinished()
+ .onFailure((list, exc) -> LOG.warn("Delete failed for metadata file:
{}", list, exc))
+ .run(io::deleteFile);
+
+ return true;
+ }
+
+ private void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
+ // keep track of deleted files in a map that can be cleaned up when memory
runs low
+ Map<String, Boolean> deletedFiles = new MapMaker()
+ .concurrencyLevel(ThreadPools.WORKER_THREAD_POOL_SIZE)
+ .weakKeys()
+ .makeMap();
+
+ Tasks.foreach(allManifests)
+ .noRetry().suppressFailureWhenFinished()
+ .executeWith(ThreadPools.getWorkerPool())
+ .onFailure((item, exc) ->
+ LOG.warn("Failed to get deleted files: this may cause orphaned
data files", exc)
+ ).run(manifest -> {
+ try (ManifestReader reader =
ManifestReader.read(io.newInputFile(manifest.path()))) {
+ for (ManifestEntry entry : reader.entries()) {
+ // intern the file path because the weak key map uses identity (==)
instead of equals
+ String path = entry.file().path().toString().intern();
Review comment:
@danielcweeks and I looked into this offline and concluded that it is now
safe to use `intern` in this case. As of Java 7, the intern table is kept on
the heap and strings in the table are eligible for garbage collection.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]