szehon-ho commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r588264301
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
+
+ # Make a copy, as it is drained as we explore the manifest to list
files.
+ (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+ with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+ cpu_count())) as delete_pool:
+ delete_pool.map(self.__delete_file, self.__get_files(manifests))
+ delete_pool.map(self.__delete_file, (m.manifest_path for m in
manifests_to_delete))
+ delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+ delete_pool.map(self.__delete_file,
[ops.current_metadata_location])
+
+ def __get_files(self, manifests):
+ return
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in
manifests))
+
+ def __get_data_files_by_manifest(self, manifest):
+ file = FileSystemInputFile.from_location(manifest.manifest_path,
self.conf)
Review comment:
I tried a bit for this. First I hit an error:
File "/iceberg/python/iceberg/core/base_snapshot.py", line 131, in
get_filtered_manifest
reader = ManifestReader.read(self.ops.new_input_file(path))
AttributeError: 'BaseSnapshot' object has no attribute 'ops'
I could fix the error but wondering if it's worth it over two lines?
DataTableScan also has these two lines. This logic is mostly static , save for
self.conf reference. Otherwise I'd have to pass the snapshot around along with
the manifest in the iterators, which makes the code even harder to read.
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
+
+ # Make a copy, as it is drained as we explore the manifest to list
files.
+ (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+ with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+ cpu_count())) as delete_pool:
+ delete_pool.map(self.__delete_file, self.__get_files(manifests))
+ delete_pool.map(self.__delete_file, (m.manifest_path for m in
manifests_to_delete))
+ delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+ delete_pool.map(self.__delete_file,
[ops.current_metadata_location])
+
+ def __get_files(self, manifests):
+ return
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in
manifests))
+
+ def __get_data_files_by_manifest(self, manifest):
+ file = FileSystemInputFile.from_location(manifest.manifest_path,
self.conf)
+ reader = ManifestReader.read(file)
+ return (i.path() for i in reader.iterator())
+
+ def __delete_file(self, path):
Review comment:
Done
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
+
+ # Make a copy, as it is drained as we explore the manifest to list
files.
+ (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+ with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+ cpu_count())) as delete_pool:
+ delete_pool.map(self.__delete_file, self.__get_files(manifests))
+ delete_pool.map(self.__delete_file, (m.manifest_path for m in
manifests_to_delete))
Review comment:
Added a de-duplication in the iterator.
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]