szehon-ho commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r588262015



##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive 
Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, 
s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list 
files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in 
manifests_to_delete))
+            delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+            delete_pool.map(self.__delete_file, 
[ops.current_metadata_location])
+
+    def __get_files(self, manifests):
+        return 
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in 
manifests))
+
+    def __get_data_files_by_manifest(self, manifest):
+        file = FileSystemInputFile.from_location(manifest.manifest_path, 
self.conf)

Review comment:
       I tried for awhile, I hit some existing issues like:
   `  File "/iceberg/python/iceberg/core/base_snapshot.py", line 131, in 
get_filtered_manifest
       reader = ManifestReader.read(self.ops.new_input_file(path))
   AttributeError: 'BaseSnapshot' object has no attribute 'ops'
   `
   I could fix those, but was wondering if it's not better to just have these 
two lines here as well, if it's pretty static logic (save the reference to 
self.conf)?  DataTableScan does the same thing.  The problem with calling the 
snapshot version  is that then I'd have to pass the snapshot along with the 
manifest in my iterator stuff and it makes it slightly even less readable.

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive 
Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, 
s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list 
files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in 
manifests_to_delete))
+            delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+            delete_pool.map(self.__delete_file, 
[ops.current_metadata_location])
+
+    def __get_files(self, manifests):
+        return 
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in 
manifests))
+
+    def __get_data_files_by_manifest(self, manifest):
+        file = FileSystemInputFile.from_location(manifest.manifest_path, 
self.conf)
+        reader = ManifestReader.read(file)
+        return (i.path() for i in reader.iterator())
+
+    def __delete_file(self, path):

Review comment:
       Done

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive 
Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, 
s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list 
files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in 
manifests_to_delete))

Review comment:
       Added a deduplication for the iterator

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to