szehon-ho commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r588260136
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
Review comment:
Yea it took me a bit to find the mystery. In the implementation of the
property, manifest_list does "FileSystemInputFile.location" which is itself a
method. Changed the implementation of the property to fix this..
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
+
+ # Make a copy, as it is drained as we explore the manifest to list
files.
+ (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+ with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+ cpu_count())) as delete_pool:
+ delete_pool.map(self.__delete_file, self.__get_files(manifests))
+ delete_pool.map(self.__delete_file, (m.manifest_path for m in
manifests_to_delete))
+ delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+ delete_pool.map(self.__delete_file,
[ops.current_metadata_location])
+
+ def __get_files(self, manifests):
+ return
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in
manifests))
+
+ def __get_data_files_by_manifest(self, manifest):
+ file = FileSystemInputFile.from_location(manifest.manifest_path,
self.conf)
Review comment:
I tried for awhile, I hit some existing issues like:
` File "/iceberg/python/iceberg/core/base_snapshot.py", line 131, in
get_filtered_manifest
reader = ManifestReader.read(self.ops.new_input_file(path))
AttributeError: 'BaseSnapshot' object has no attribute 'ops'
`
I could fix those, but was wondering if it's not better to just have these
two lines here as well, if it's pretty static logic (save the reference to
self.conf)? DataTableScan does the same thing. The problem with calling the
snapshot version is that then I'd have to pass the snapshot along with the
manifest in my iterator stuff and it makes it slightly even less readable.
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
+
+ # Make a copy, as it is drained as we explore the manifest to list
files.
+ (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+ with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+ cpu_count())) as delete_pool:
+ delete_pool.map(self.__delete_file, self.__get_files(manifests))
+ delete_pool.map(self.__delete_file, (m.manifest_path for m in
manifests_to_delete))
+ delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+ delete_pool.map(self.__delete_file,
[ops.current_metadata_location])
+
+ def __get_files(self, manifests):
+ return
itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in
manifests))
+
+ def __get_data_files_by_manifest(self, manifest):
+ file = FileSystemInputFile.from_location(manifest.manifest_path,
self.conf)
+ reader = ManifestReader.read(file)
+ return (i.path() for i in reader.iterator())
+
+ def __delete_file(self, path):
Review comment:
Done
##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
def new_table_ops(self, conf, database, table):
return HiveTableOperations(conf, self.get_client(), database, table)
- def drop(self, database, table):
- raise RuntimeError("Not yet implemented")
+ def drop(self, database, table, purge=False):
+ ops = self.new_table_ops(self.conf, database, table)
+ metadata = ops.current()
+
+ with self.get_client() as open_client:
+ _logger.info("Deleting {database}.{table} from Hive
Metastore".format(database=database, table=table))
+ open_client.drop_table(database, table, deleteData=False)
+
+ manifest_lists_to_delete = []
+ manifests_to_delete = itertools.chain()
+
+ if purge:
+ if metadata is not None:
+ for s in metadata.snapshots:
+ manifests_to_delete = itertools.chain(manifests_to_delete,
s.manifests)
+ if s.manifest_location is not None:
+ manifest_lists_to_delete.append(s.manifest_location())
+
+ # Make a copy, as it is drained as we explore the manifest to list
files.
+ (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+ with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+ cpu_count())) as delete_pool:
+ delete_pool.map(self.__delete_file, self.__get_files(manifests))
+ delete_pool.map(self.__delete_file, (m.manifest_path for m in
manifests_to_delete))
Review comment:
Added a deduplication for the iterator
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]