This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 4595371ea IMPALA-11821: Adjusting manifest_length and absolute paths
in case of metadata rewrite
4595371ea is described below
commit 4595371ea29abdcf0707e17069bfcf984ca5b8e6
Author: Gergely Fürnstáhl <[email protected]>
AuthorDate: Wed Jan 11 16:09:11 2023 +0100
IMPALA-11821: Adjusting manifest_length and absolute paths in case of
metadata rewrite
testdata/bin/rewrite-iceberg-metadata.py rewrites manifest and snapshot
files using the provided prefix for file paths. Snapshot files store
the length of manifest files as well, this needs to be adjusted too.
Additionally, improved path rewrite to be able to rewrite absolute
paths correctly and pretty dumping metadata jsons.
Testing:
- Tested locally, manually verified the rewrites
- Tested on Ozone, automatically rewriting the test data and running
test_iceberg.py
Change-Id: I89b9208f25552012cc1ab16fa60a819dd5a683d9
Reviewed-on: http://gerrit.cloudera.org:8080/19412
Reviewed-by: Noemi Pap-Takacs <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
testdata/bin/rewrite-iceberg-metadata.py | 54 +++++++++++++++++++++++++++-----
tests/common/skip.py | 2 +-
tests/query_test/test_iceberg.py | 2 --
3 files changed, 47 insertions(+), 11 deletions(-)
diff --git a/testdata/bin/rewrite-iceberg-metadata.py
b/testdata/bin/rewrite-iceberg-metadata.py
index ec165f1d7..1ccee49a7 100755
--- a/testdata/bin/rewrite-iceberg-metadata.py
+++ b/testdata/bin/rewrite-iceberg-metadata.py
@@ -32,31 +32,55 @@ if len(args) < 2:
prefix = args[0]
+# Easier to cache it instead of trying to resolve the manifest files paths
+file_size_cache = {}
+
+
+def generate_new_path(prefix, file_path):
+ """ Hive generates metadata with absolute paths.
+ This method relativizes the path and applies a new prefix."""
+ start_directory = "/test-warehouse"
+ start = file_path.find(start_directory)
+ if start == -1:
+ raise RuntimeError("{} is not found in file path:{}".format(
+ start_directory, file_path))
+ return prefix + file_path[start:]
+
def add_prefix_to_snapshot(snapshot):
if 'manifest-list' in snapshot:
- snapshot['manifest-list'] = prefix + snapshot['manifest-list']
+ snapshot['manifest-list'] = generate_new_path(prefix,
snapshot['manifest-list'])
if 'manifests' in snapshot:
- snapshot['manifests'] = map(lambda m: prefix + m, snapshot['manifests'])
+ snapshot['manifests'] = map(lambda m: generate_new_path(prefix, m),
+ snapshot['manifests'])
return snapshot
def add_prefix_to_mlog(metadata_log):
- metadata_log['metadata-file'] = prefix + metadata_log['metadata-file']
+ metadata_log['metadata-file'] = generate_new_path(prefix,
metadata_log['metadata-file'])
return metadata_log
def add_prefix_to_snapshot_entry(entry):
if 'manifest_path' in entry:
- entry['manifest_path'] = prefix + entry['manifest_path']
+ entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path'])
if 'data_file' in entry:
- entry['data_file']['file_path'] = prefix + entry['data_file']['file_path']
+ entry['data_file']['file_path'] = generate_new_path(prefix,
+ entry['data_file']['file_path'])
+ return entry
+
+
+def fix_manifest_length(entry):
+ if 'manifest_path' in entry and 'manifest_length' in entry:
+ filename = entry['manifest_path'].split('/')[-1]
+ if filename in file_size_cache:
+ entry['manifest_length'] = file_size_cache[filename]
return entry
for arg in args[1:]:
# Update metadata.json
- for mfile in glob.glob(os.path.join(arg, 'v*.metadata.json')):
+ for mfile in glob.glob(os.path.join(arg, '*.metadata.json')):
with open(mfile, 'r') as f:
metadata = json.load(f)
@@ -70,7 +94,7 @@ for arg in args[1:]:
continue
# metadata: required
- metadata['location'] = prefix + metadata['location']
+ metadata['location'] = generate_new_path(prefix, metadata['location'])
# snapshots: optional
if 'snapshots' in metadata:
@@ -81,7 +105,7 @@ for arg in args[1:]:
metadata['metadata-log'] = map(add_prefix_to_mlog,
metadata['metadata-log'])
with open(mfile + '.tmp', 'w') as f:
- json.dump(metadata, f)
+ json.dump(metadata, f, indent=2)
os.rename(mfile + '.tmp', mfile)
for afile in glob.glob(os.path.join(arg, '*.avro')):
@@ -95,3 +119,17 @@ for arg in args[1:]:
for line in lines:
writer.append(line)
os.rename(afile + '.tmp', afile)
+ filename = afile.split('/')[-1]
+ file_size_cache[filename] = os.path.getsize(afile)
+
+ for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')):
+ with open(snapfile, 'rb') as f:
+ with DataFileReader(f, DatumReader()) as reader:
+ schema = reader.datum_reader.writers_schema
+ lines = map(fix_manifest_length, reader)
+
+ with open(snapfile + '.tmp', 'wb') as f:
+ with DataFileWriter(f, DatumWriter(), schema) as writer:
+ for line in lines:
+ writer.append(line)
+ os.rename(snapfile + '.tmp', snapfile)
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 648204395..85d2c678d 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -104,7 +104,7 @@ class SkipIf:
sfs_unsupported = pytest.mark.skipif(not (IS_HDFS or IS_S3 or IS_ABFS or
IS_ADLS
or IS_GCS), reason="Hive support for sfs+ is limited, HIVE-26757")
hardcoded_uris = pytest.mark.skipif(not IS_HDFS,
- reason="Iceberg hardcodes the full URI in parquet delete files and
metadata files")
+ reason="Iceberg delete files hardcode the full URI in parquet files")
not_ec = pytest.mark.skipif(not IS_EC, reason="Erasure Coding needed")
no_secondary_fs = pytest.mark.skipif(not SECONDARY_FILESYSTEM,
reason="Secondary filesystem needed")
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 062a6cc12..9ae75885f 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -842,7 +842,6 @@ class TestIcebergTable(IcebergTestSuite):
self.run_test_case('QueryTest/iceberg-multiple-storage-locations-table',
vector, unique_database)
- @SkipIf.hardcoded_uris
def test_mixed_file_format(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-mixed-file-format', vector,
unique_database)
@@ -958,7 +957,6 @@ class TestIcebergTable(IcebergTestSuite):
def test_virtual_columns(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-virtual-columns', vector,
unique_database)
- @SkipIf.hardcoded_uris
def test_avro_file_format(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-avro', vector, unique_database)