rymurr commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r589037716



##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,12 +42,59 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False) -> None:
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
 
-    def get_client(self):
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive 
Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        if purge:
+            if metadata is not None:
+                manifest_lists_to_delete = []
+                manifests_to_delete = itertools.chain()
+
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, 
(m for m in s.manifests))
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location)
+
+                # Make a copy, as it is drained as we explore the manifest to 
list files.
+                (manifests, manifests_to_delete) = 
itertools.tee(manifests_to_delete)
+
+                with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                          cpu_count())) as delete_pool:
+                    delete_pool.map(self._delete_file(ops), 
self._unique(self._get_data_files(manifests)))
+                    delete_pool.map(self._delete_file(ops), 
self._unique(m.manifest_path for m in manifests_to_delete))
+                    delete_pool.map(self._delete_file(ops), 
self._unique(manifest_lists_to_delete))
+                    delete_pool.map(self._delete_file(ops), 
[ops.current_metadata_location])
+
+    def get_client(self) -> HMSClient:
         from urllib.parse import urlparse
         metastore_uri = urlparse(self.conf[HiveTables.THRIFT_URIS])
 
         client = hmsclient.HMSClient(host=metastore_uri.hostname, 
port=metastore_uri.port)
         return client
+
+    def _get_data_files(self, manifests) -> Iterator[str]:
+        return 
itertools.chain.from_iterable(self._get_data_files_by_manifest(m) for m in 
manifests)
+
+    def _get_data_files_by_manifest(self, manifest) -> Iterator[str]:
+        file = FileSystemInputFile.from_location(manifest.manifest_path, 
self.conf)
+        reader = ManifestReader.read(file)
+        return (i.path() for i in reader.iterator())
+
+    @staticmethod
+    def _delete_file(ops):
+        return lambda path: (
+            _logger.info("Deleting file: {path}".format(path=path)),
+            ops.delete_file(path))
+
+    @staticmethod
+    def _unique(iterable: Iterator) -> Iterator:

Review comment:
       I think it might be easier to just catch an exceptoion in `_delete_file`

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,12 +42,59 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False) -> None:
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
 
-    def get_client(self):
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive 
Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        if purge:
+            if metadata is not None:

Review comment:
       I had something a bit simpler in mind. I think we can avoid some of the 
itertools stuff by just deleting as we find files. This also implies that the 
`_delete_file` method catches deletes that happen twice
   
   ``` python
               if metadata is not None:
                   with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
                             cpu_count())) as delete_pool:
                       manifest_lists_to_delete = 
[ops.current_metadata_location]
   
                       for s in metadata.snapshots:
                           delete_pool.map(self._delete_file(ops), (i.path() 
for i in s.get_filtered_manifests().iterator()))
                           delete_pool.map(self._delete_file(ops), s.manifests)
                           if s.manifest_location is not None:
                               
manifest_lists_to_delete.append(s.manifest_location)
                       delete_pool.map(self._delete_file(ops), 
manifest_lists_to_delete)
   ```
   
   I didn't test this code so it may not be quite right. And some manifests 
mentioned in later snapshots may already be deleted so can't be read (but can 
then just be skipped). 

##########
File path: python/tests/hive/test_hive_tables.py
##########
@@ -157,3 +157,84 @@ def test_create_tables(client, current_call, 
base_scan_schema, base_scan_partiti
     current_call.return_value = tbl[0].args[0].parameters['metadata_location']
 
     tables.load("test.test_123")
+
+
[email protected]("iceberg.hive.HiveTables._delete_files")
[email protected]("iceberg.hive.HiveTableOperations.refresh_from_metadata_location")
[email protected]("iceberg.hive.HiveTableOperations.current")
[email protected]("iceberg.hive.HiveTables.get_client")
+def test_drop_tables(client, metadata, refresh_call, tmpdir):
+
+    parameters = {"table_type": "ICEBERG",
+                  "partition_spec": [],
+                  "metadata_location": "s3://path/to/iceberg.metadata.json"}
+
+    client.return_value.__enter__.return_value.get_table.return_value = 
MockHMSTable(parameters)
+    conf = {"hive.metastore.uris": 'thrift://hms:port',
+            "hive.metastore.warehouse.dir": tmpdir}
+    tables = HiveTables(conf)
+    tables.drop("test", "test_123", purge=False)
+    
client.return_value.__enter__.return_value.drop_table.assert_called_with("test",
 "test_123", deleteData=False)
+
+
+class MockTableOperations(object):
+    def __init__(self, location):
+        self.deleted = set()
+        self.current_metadata_location = location
+
+    def current(self):
+        return MockMetadata()
+
+    def delete_file(self, path):
+        self.deleted.add(path)
+
+
+class MockMetadata(object):

Review comment:
       I think these could be useful in `test_helpers.py`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to