szehon-ho commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r588266578



##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive 
Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, 
s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list 
files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in 
manifests_to_delete))
+            delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+            delete_pool.map(self.__delete_file, 
[ops.current_metadata_location])
+
+    def __get_files(self, manifests):
+        return 
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in 
manifests))
+
+    def __get_data_files_by_manifest(self, manifest):
+        file = FileSystemInputFile.from_location(manifest.manifest_path, 
self.conf)
+        reader = ManifestReader.read(file)
+        return (i.path() for i in reader.iterator())
+
+    def __delete_file(self, path):

Review comment:
       By the way the TableOps implementation was passing too many arguments, 
fixed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to